diff --git a/.wordlist.txt b/.wordlist.txt index fd4abcea3..6fa1cfa93 100644 --- a/.wordlist.txt +++ b/.wordlist.txt @@ -186,3 +186,4 @@ webhooks WIP WIP YAML +dropnull diff --git a/docs/examples/api-reference/operators_ref.py b/docs/examples/api-reference/operators_ref.py index e726db923..40ae7af9e 100644 --- a/docs/examples/api-reference/operators_ref.py +++ b/docs/examples/api-reference/operators_ref.py @@ -46,6 +46,10 @@ class UserTransactions: @pipeline(version=1) @inputs(Activity) def create_user_transactions(cls, activity: Dataset): + # docsnip dropnull + dropnull_amounts = activity.dropnull("amount") + # /docsnip + # docsnip transform def extract_info(df: pd.DataFrame) -> pd.DataFrame: df_json = df["metadata"].apply(json.loads).apply(pd.Series) @@ -55,7 +59,7 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame: ["merchant_id", "transaction_amount", "user_id", "timestamp"] ] - transformed_ds = activity.transform( + transformed_ds = dropnull_amounts.transform( extract_info, schema={ "transaction_amount": float, diff --git a/docs/pages/api-reference/operators.md b/docs/pages/api-reference/operators.md index 4f22dfec7..1188d8e5b 100644 --- a/docs/pages/api-reference/operators.md +++ b/docs/pages/api-reference/operators.md @@ -122,7 +122,8 @@ Fennel allows you to drop columns from a dataset using the `drop` operator. The `drop` operator has the following parameters: -1. `columns: List[str]` - positional argument, that specifies the list of columns to drop from the dataset. +1. `columns: List[str]` - keyword or positional argument, that specifies the list of columns to drop from the dataset. + - You can also use `*args` to pass the column names for added flexibility. :::info Fennel does not allow you to drop keys or timestamp columns from a dataset. @@ -130,13 +131,36 @@ Fennel does not allow you to drop keys or timestamp columns from a dataset.

 
+
+### Dropnull
+
+Fennel allows you to drop null values from columns of a dataset using the `dropnull` operator.
+`dropnull` changes the type of the columns specified from `Optional[T]` to `T`
+
+The `dropnull` operator has the following parameters:
+
+1. `columns: List[str]` - keyword or positional argument, that specifies the list of columns to filter out null values in the dataset.
+    - You can also use `*args` to pass the column names for added flexibility.
+
+Either one of `*args` or `columns` can be provided as an argument for `dropnull`. 
+If no arguments are given, `columns` will be all fields with the type `Optional[T]` in the dataset.
+
+:::info
+Fennel only allows you to call dropnull of columns with Optional type.
+:::
+
+

+
 ### Select
 
-Fennel allows you to drop columns from a dataset using the `select` operator.
+Fennel allows you to select columns from a dataset using the `select` operator.
 
 The `select` operator has the following parameters:
 
-1. `*args: str` - positional arguments that specify the list of fields to select.
+1. `columns: List[str]` - keyword argument, that specifies the list of columns to select from the dataset.
+    - You can also use `*args` to pass the column names for added flexibility.
+
+Either one of `*args` or `columns` must be provided as an argument for `select`. 
 
 :::info
 Fennel requires you to select all key fields, with the timestamp column automatically included.
diff --git a/fennel/client_tests/test_dataset.py b/fennel/client_tests/test_dataset.py
index c5bd06b7f..2a8c87206 100644
--- a/fennel/client_tests/test_dataset.py
+++ b/fennel/client_tests/test_dataset.py
@@ -58,14 +58,31 @@ def get_info(cls, info: Dataset):
 class UserInfoDatasetDerivedSelect:
     user_id: int = field(key=True).meta(description="User ID")  # type: ignore
     name: str = field().meta(description="User name")  # type: ignore
-    country_name: Optional[str]
+    country_name: str
     ts: datetime = field(timestamp=True)
 
     @pipeline(version=1)
     @inputs(UserInfoDataset)
     def get_info(cls, info: Dataset):
         x = info.rename({"country": "country_name", "timestamp": "ts"})
-        return x.select("user_id", "name", "country_name")
+        return x.select("user_id", "name", "country_name").dropnull(
+            "country_name"
+        )
+
+
+@meta(owner="test@test.com")
+@dataset
+class UserInfoDatasetDerivedDropnull:
+    user_id: int = field(key=True).meta(description="User ID")  # type: ignore
+    name: str = field().meta(description="User name")  # type: ignore
+    country: str
+    age: int
+    timestamp: datetime = field(timestamp=True)
+
+    @pipeline(version=1)
+    @inputs(UserInfoDataset)
+    def get_info(cls, info: Dataset):
+        return info.dropnull()
 
 
 class TestDataset(unittest.TestCase):
@@ -103,6 +120,9 @@ def test_simple_select_rename(self, client):
         data = [
             [18232, "Ross", 32, "USA", now],
             [18234, "Monica", 24, "Chile", yesterday],
+            [18235, "Jessica", 24, None, yesterday],
+            [18236, "Jane", 24, None, yesterday],
+            [18237, "Chandler", 24, None, yesterday],
         ]
         columns = ["user_id", "name", "age", "country", "timestamp"]
         df = pd.DataFrame(data, columns=columns)
@@ -134,6 +154,71 @@ def test_simple_select_rename(self, client):
         assert all(x in df.columns for x in ["user_id", "name", "country_name"])
         assert "age" not in df.columns
 
+    @pytest.mark.integration
+    @mock
+    def test_simple_drop_null(self, client):
+        # Sync the dataset
+        client.sync(datasets=[UserInfoDataset, UserInfoDatasetDerivedDropnull])
+        now = datetime.now()
+        data = [
+            [18232, "Ross", 32, "USA", now],
+            [18234, "Monica", None, "Chile", now],
+            [18235, "Jessica", 24, None, now],
+            [18236, "Jane", 24, None, now],
+            [18237, "Chandler", 24, None, now],
+            [18238, "Mike", 32, "UK", now],
+        ]
+        columns = ["user_id", "name", "age", "country", "timestamp"]
+        df = pd.DataFrame(data, columns=columns)
+        df["age"] = df["age"].astype(pd.Int64Dtype())
+        response = client.log("fennel_webhook", "UserInfoDataset", df)
+        assert response.status_code == requests.codes.OK, response.json()
+
+        # Do lookup on UserInfoDataset
+        if client.is_integration_client():
+            client.sleep()
+        ts = pd.Series([now, now, now, now, now, now])
+        user_id_keys = pd.Series([18232, 18234, 18235, 18236, 18237, 18238])
+
+        df, found = UserInfoDataset.lookup(ts, user_id=user_id_keys)
+        assert df.shape == (6, 5)
+        assert df["user_id"].tolist() == [
+            18232,
+            18234,
+            18235,
+            18236,
+            18237,
+            18238,
+        ]
+        assert df["name"].tolist() == [
+            "Ross",
+            "Monica",
+            "Jessica",
+            "Jane",
+            "Chandler",
+            "Mike",
+        ]
+        assert df["age"].tolist() == [32, None, 24, 24, 24, 32]
+        assert df["country"].tolist() == [
+            "USA",
+            "Chile",
+            None,
+            None,
+            None,
+            "UK",
+        ]
+
+        # Do lookup on UserInfoDatasetDerived
+        df, found = UserInfoDatasetDerivedDropnull.lookup(
+            ts, user_id=user_id_keys
+        )
+
+        # TODO (thaqib)
+        assert df.shape == (6, 5)
+        assert df["user_id"].tolist() == user_id_keys.tolist()
+        assert df["name"].tolist() == ["Ross", None, None, None, None, "Mike"]
+        assert df["country"].tolist() == ["USA", None, None, None, None, "UK"]
+
     @pytest.mark.integration
     @mock
     def test_simple_drop_rename(self, client):
diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py
index 4a48f0ac2..556c8f89e 100644
--- a/fennel/datasets/datasets.py
+++ b/fennel/datasets/datasets.py
@@ -6,6 +6,7 @@
 import inspect
 import sys
 from dataclasses import dataclass
+import typing
 
 import numpy as np
 import pandas as pd
@@ -57,6 +58,7 @@
     FENNEL_STRUCT_SRC_CODE,
     FENNEL_STRUCT_DEPENDENCIES_SRC_CODE,
 )
+
 from fennel.sources.sources import DataConnector, source
 from fennel.utils import (
     fhash,
@@ -119,19 +121,7 @@ def meta(self, **kwargs: Any) -> T:  # type: ignore
         return f
 
     def is_optional(self) -> bool:
-        def _get_origin(type_: Any) -> Any:
-            return getattr(type_, "__origin__", None)
-
-        def _get_args(type_: Any) -> Any:
-            return getattr(type_, "__args__", None)
-
-        if (
-            _get_origin(self.dtype) is Union
-            and type(None) is _get_args(self.dtype)[1]
-        ):
-            return True
-
-        return False
+        return fennel_is_optional(self.dtype)
 
     def fqn(self) -> str:
         return f"{self.dataset_name}.{self.name}"
@@ -241,6 +231,16 @@ def drop(self, *args, columns: Optional[List[str]] = None) -> _Node:
         drop_cols = _Node.__get_drop_args(*args, columns=columns)
         return self.__drop(drop_cols)
 
+    def dropnull(self, *args, columns: Optional[List[str]] = None) -> _Node:
+        cols = None
+        if len(args) == 0 and columns is None:  # dropnull with no get_args
+            cols = self.dsschema().get_optional_cols()
+        else:
+            cols = _Node.__get_drop_args(
+                *args, columns=columns, name="dropnull"
+            )
+        return DropNull(self, cols)
+
     def select(self, *args, columns: Optional[List[str]] = None) -> _Node:
         cols = _Node.__get_drop_args(*args, columns=columns, name="select")
         ts = self.dsschema().timestamp
@@ -693,6 +693,23 @@ def name(self):
         return self.__name
 
 
+class DropNull(_Node):
+    def __init__(self, node: _Node, columns: List[str]):
+        super().__init__()
+        self.node = node
+        self.columns = columns
+        self.node.out_edges.append(self)
+
+    def signature(self):
+        return fhash(self.node.signature(), self.columns)
+
+    def dsschema(self):
+        input_schema = copy.deepcopy(self.node.dsschema())
+        for field in self.columns:
+            input_schema.drop_null_column(field)
+        return input_schema
+
+
 # ---------------------------------------------------------------------
 # dataset & pipeline decorators
 # ---------------------------------------------------------------------
@@ -893,6 +910,17 @@ def wrap(c: Type[T]) -> Dataset:
     return wrap(cls)
 
 
+def fennel_is_optional(type_):
+    return (
+        typing.get_origin(type_) is Union
+        and type(None) is typing.get_args(type_)[1]
+    )
+
+
+def fennel_get_optional_inner(type_):
+    return typing.get_args(type_)[0]
+
+
 # Fennel implementation of get_type_hints which does not error on forward
 # references not being types such as Embedding[4].
 def f_get_type_hints(obj):
@@ -1456,6 +1484,8 @@ def visit(self, obj):
             return self.visitExplode(obj)
         elif isinstance(obj, First):
             return self.visitFirst(obj)
+        elif isinstance(obj, DropNull):
+            return self.visitDropNull(obj)
         elif isinstance(obj, Assign):
             return self.visitAssign(obj)
         else:
@@ -1497,6 +1527,9 @@ def visitExplode(self, obj):
     def visitFirst(self, obj):
         raise NotImplementedError()
 
+    def visitDropNull(self, obj):
+        raise NotImplementedError()
+
     def visitAssign(self, obj):
         raise NotImplementedError()
 
@@ -1540,6 +1573,23 @@ def rename_column(self, old_name: str, new_name: str):
                 f"field {old_name} not found in schema of {self.name}"
             )
 
+    def get_optional_cols(self) -> List[str]:
+        return [
+            col for col, t in self.schema().items() if fennel_is_optional(t)
+        ]
+
+    def drop_null_column(self, name: str):
+        if name in self.keys:
+            self.keys[name] = fennel_get_optional_inner(self.keys[name])
+        elif name in self.values:
+            self.values[name] = fennel_get_optional_inner(self.values[name])
+        elif name == self.timestamp:
+            raise Exception(
+                f"cannot drop_null on timestamp field {name} of {self.name}"
+            )
+        else:
+            raise Exception(f"field {name} not found in schema of {self.name}")
+
     def append_value_column(self, name: str, type_: Type):
         if name in self.keys:
             raise Exception(
@@ -1956,6 +2006,29 @@ def visitDrop(self, obj) -> DSSchema:
         output_schema.name = output_schema_name
         return output_schema
 
+    def visitDropNull(self, obj):
+        input_schema = copy.deepcopy(self.visit(obj.node))
+        output_schema_name = f"'[Pipeline:{self.pipeline_name}]->dropnull node'"
+        if obj.columns is None or len(obj.columns) == 0:
+            raise ValueError(
+                f"invalid dropnull - {output_schema_name} must have at least one column"
+            )
+        for field in obj.columns:
+            if (
+                field not in input_schema.schema()
+                or field == input_schema.timestamp
+            ):
+                raise ValueError(
+                    f"invalid dropnull column {field} not present in {input_schema.name}"
+                )
+            if not fennel_is_optional(input_schema.get_type(field)):
+                raise ValueError(
+                    f"invalid dropnull {field} has type {input_schema.get_type(field)} expected Optional type"
+                )
+        output_schema = obj.dsschema()
+        output_schema.name = output_schema_name
+        return output_schema
+
     def visitDedup(self, obj) -> DSSchema:
         input_schema = self.visit(obj.node)
         output_schema_name = (
diff --git a/fennel/datasets/test_dataset.py b/fennel/datasets/test_dataset.py
index e4593483d..b5a8bba69 100644
--- a/fennel/datasets/test_dataset.py
+++ b/fennel/datasets/test_dataset.py
@@ -1557,6 +1557,130 @@ def from_a(cls, a: Dataset):
     )
 
 
+def test_dropnull():
+    @meta(owner="test@test.com")
+    @dataset
+    class A:
+        a1: int = field(key=True)
+        a2: Optional[int]
+        a3: str
+        a4: Optional[float]
+        t: datetime
+
+    @meta(owner="test@test.com")
+    @dataset
+    class B:
+        a1: int = field(key=True)
+        a2: int
+        a3: str
+        a4: float
+        t: datetime
+
+        @pipeline(version=1)
+        @inputs(A)
+        def from_a(cls, a: Dataset):
+            x = a.dropnull("a2", "a4")
+            return x
+
+    view = InternalTestClient()
+    view.add(B)
+    sync_request = view._get_sync_request_proto()
+    assert len(sync_request.datasets) == 1
+    d = {
+        "name": "B",
+        "dsschema": {
+            "keys": {
+                "fields": [
+                    {
+                        "name": "a1",
+                        "dtype": {"int_type": {}},
+                    }
+                ]
+            },
+            "values": {
+                "fields": [
+                    {
+                        "name": "a2",
+                        "dtype": {"int_type": {}},
+                    },
+                    {
+                        "name": "a3",
+                        "dtype": {"string_type": {}},
+                    },
+                    {
+                        "name": "a4",
+                        "dtype": {"double_type": {}},
+                    },
+                ]
+            },
+            "timestamp": "t",
+        },
+        "metadata": {"owner": "test@test.com"},
+        "history": "63072000s",
+        "retention": "63072000s",
+        "field_metadata": {
+            "a1": {},
+            "a2": {},
+            "a3": {},
+            "a4": {},
+            "t": {},
+        },
+        "pycode": {},
+    }
+    dataset_req = sync_request.datasets[0]
+    dataset_req.pycode.Clear()
+    expected_dataset_request = ParseDict(d, ds_proto.CoreDataset())
+    assert dataset_req == expected_dataset_request, error_message(
+        dataset_req, expected_dataset_request
+    )
+
+    assert len(sync_request.pipelines) == 1
+    pipeline_req = sync_request.pipelines[0]
+    pipeline_req.pycode.Clear()
+    p = {
+        "name": "from_a",
+        "dataset_name": "B",
+        "signature": "from_a",
+        "metadata": {},
+        "input_dataset_names": ["A"],
+        "version": 1,
+        "active": True,
+        "pycode": {},
+    }
+    expected_pipeline_request = ParseDict(p, ds_proto.Pipeline())
+    assert pipeline_req == expected_pipeline_request, error_message(
+        pipeline_req, expected_pipeline_request
+    )
+
+    assert len(sync_request.operators) == 2
+    o = {
+        "id": "A",
+        "pipeline_name": "from_a",
+        "dataset_name": "B",
+        "dataset_ref": {"referring_dataset_name": "A"},
+    }
+    operator_req = sync_request.operators[0]
+    expected_operator_request = ParseDict(o, ds_proto.Operator())
+    assert operator_req == expected_operator_request, error_message(
+        operator_req, expected_operator_request
+    )
+
+    o = {
+        "id": "be210d6a1d0ed8ab544d737b629a1c8d",
+        "isRoot": True,
+        "pipelineName": "from_a",
+        "datasetName": "B",
+        "dropnull": {"operandId": "A", "columns": ["a2", "a4"]},
+    }
+
+    operator_req = sync_request.operators[1]
+    expected_operator_request = ParseDict(o, ds_proto.Operator())
+
+    assert operator_req == expected_operator_request, error_message(
+        operator_req, expected_operator_request
+    )
+
+
 def test_select_and_rename_column():
     @meta(owner="test@test.com")
     @dataset
diff --git a/fennel/datasets/test_schema_validator.py b/fennel/datasets/test_schema_validator.py
index c80a538d2..18f38b282 100644
--- a/fennel/datasets/test_schema_validator.py
+++ b/fennel/datasets/test_schema_validator.py
@@ -902,6 +902,67 @@ def pipeline_first(cls, hits: Dataset):
     )
 
 
+def test_drop_null():
+    with pytest.raises(TypeError) as e:
+
+        @meta(owner="test@test.com")
+        @dataset
+        class C1:
+            b1: int = field(key=True)
+            b2: Optional[int]
+            b3: str
+            t: datetime
+
+            @pipeline(version=1)
+            @inputs(C)
+            def drop_null_noargs(cls, c: Dataset):
+                return c.dropnull()
+
+    print(e.value)
+    assert (
+        str(e.value)
+        == """[TypeError('Field `b2` has type `int` in `pipeline drop_null_noargs output value` schema but type `Optional[int]` in `C1 value` schema.')]"""
+    )
+    with pytest.raises(ValueError) as e:
+
+        @meta(owner="test@test.com")
+        @dataset
+        class C2:
+            b1: int = field(key=True)
+            b2: int
+            b3: str
+            t: datetime
+
+            @pipeline(version=1)
+            @inputs(C)
+            def drop_null_non_opt(cls, c: Dataset):
+                return c.dropnull("b1")
+
+    assert (
+        str(e.value)
+        == "invalid dropnull b1 has type  expected Optional type"
+    )
+    with pytest.raises(ValueError) as e:
+
+        @meta(owner="test@test.com")
+        @dataset
+        class C3:
+            b1: int = field(key=True)
+            b2: int
+            b3: str
+            t: datetime
+
+            @pipeline(version=1)
+            @inputs(C)
+            def drop_null_non_present(cls, c: Dataset):
+                return c.dropnull("b4")
+
+    assert (
+        str(e.value)
+        == "invalid dropnull column b4 not present in '[Dataset:C]'"
+    )
+
+
 def test_assign():
     @meta(owner="test@test.com")
     @dataset
@@ -979,8 +1040,7 @@ def create_dataset(cls, activity: Dataset):
         str(e.value)
         == "invalid assign - '[Pipeline:create_dataset]->assign node' must specify a column to assign"
     )
-
-    with pytest.raises(ValueError) as e:
+    with pytest.raises(Exception) as e:
 
         @dataset
         class RatingActivity4:
diff --git a/fennel/gen/auth_pb2.py b/fennel/gen/auth_pb2.py
index c478c945b..5fb3361ee 100644
--- a/fennel/gen/auth_pb2.py
+++ b/fennel/gen/auth_pb2.py
@@ -2,10 +2,10 @@
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: auth.proto
 """Generated protocol buffer code."""
-from google.protobuf.internal import builder as _builder
 from google.protobuf import descriptor as _descriptor
 from google.protobuf import descriptor_pool as _descriptor_pool
 from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
 # @@protoc_insertion_point(imports)
 
 _sym_db = _symbol_database.Default()
@@ -15,13 +15,13 @@
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nauth.proto\x12\x11\x66\x65nnel.proto.auth*\xb8\x01\n\rOrgPermission\x12\x0b\n\x07ORG_ALL\x10\x00\x12\n\n\x06INVITE\x10\x01\x12\x0f\n\x0b\x43REATE_ROLE\x10\x02\x12\r\n\tEDIT_ROLE\x10\x03\x12\r\n\tVIEW_ROLE\x10\x04\x12\x0f\n\x0b\x41SSIGN_ROLE\x10\x05\x12\x12\n\x0ePROVISION_TIER\x10\x06\x12\x0f\n\x0b\x44\x45LETE_TIER\x10\x07\x12\x14\n\x10SET_DEFAULT_ROLE\x10\x08\x12\x13\n\x0f\x41SSUME_IDENTITY\x10\t*\x80\x02\n\x0eTierPermission\x12\x0c\n\x08TIER_ALL\x10\x00\x12\x16\n\x12MODIFY_ENVIRONMENT\x10\x01\x12\x0c\n\x08\x41\x44\x44_TAGS\x10\x02\x12\x14\n\x10\x44\x45LETE_TIER_ONLY\x10\x03\x12\r\n\tVIEW_TIER\x10\x04\x12\x0f\n\x0bTIER_ACCESS\x10\x05\x12\x1a\n\x16VIEW_ENTITY_DEFINITION\x10\x06\x12\x1a\n\x16\x45\x44IT_ENTITY_DEFINITION\x10\x07\x12\x14\n\x10READ_ENTITY_DATA\x10\x08\x12\x15\n\x11WRITE_ENTITY_DATA\x10\t\x12\x1f\n\x1b\x45XTRACT_HISTORICAL_FEATURES\x10\nb\x06proto3')
 
-_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
-_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'auth_pb2', globals())
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'auth_pb2', _globals)
 if _descriptor._USE_C_DESCRIPTORS == False:
-
   DESCRIPTOR._options = None
-  _ORGPERMISSION._serialized_start=34
-  _ORGPERMISSION._serialized_end=218
-  _TIERPERMISSION._serialized_start=221
-  _TIERPERMISSION._serialized_end=477
+  _globals['_ORGPERMISSION']._serialized_start=34
+  _globals['_ORGPERMISSION']._serialized_end=218
+  _globals['_TIERPERMISSION']._serialized_start=221
+  _globals['_TIERPERMISSION']._serialized_end=477
 # @@protoc_insertion_point(module_scope)
diff --git a/fennel/gen/dataset_pb2.py b/fennel/gen/dataset_pb2.py
index 26d31ee35..aa173f831 100644
--- a/fennel/gen/dataset_pb2.py
+++ b/fennel/gen/dataset_pb2.py
@@ -18,7 +18,7 @@
 from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2
 
 
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rdataset.proto\x12\x14\x66\x65nnel.proto.dataset\x1a\x0emetadata.proto\x1a\x0cpycode.proto\x1a\x0cschema.proto\x1a\nspec.proto\x1a\x1egoogle/protobuf/duration.proto\"\xb8\x04\n\x0b\x43oreDataset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12/\n\x08\x64sschema\x18\x03 \x01(\x0b\x32\x1d.fennel.proto.schema.DSSchema\x12*\n\x07history\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12,\n\tretention\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12L\n\x0e\x66ield_metadata\x18\x06 \x03(\x0b\x32\x34.fennel.proto.dataset.CoreDataset.FieldMetadataEntry\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x19\n\x11is_source_dataset\x18\t \x01(\x08\x12\x37\n\x08lineages\x18\x08 \x01(\x0b\x32%.fennel.proto.dataset.DatasetLineages\x12\x37\n\x0f\x61\x63tive_dataflow\x18\n \x01(\x0b\x32\x1e.fennel.proto.dataset.Dataflow\x1aU\n\x12\x46ieldMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata:\x02\x38\x01\"Q\n\x08OnDemand\x12\x1c\n\x14\x66unction_source_code\x18\x01 \x01(\t\x12\x10\n\x08\x66unction\x18\x02 \x01(\x0c\x12\x15\n\rexpires_after\x18\x03 \x01(\x03\"\x99\x02\n\x08Pipeline\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x14\n\x0c\x64\x61taset_name\x18\x02 \x01(\t\x12\x11\n\tsignature\x18\x03 \x01(\t\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x1b\n\x13input_dataset_names\x18\x05 \x03(\t\x12\x0f\n\x07version\x18\x06 \x01(\r\x12\x0e\n\x06\x61\x63tive\x18\x07 \x01(\x08\x12\x38\n\x08lineages\x18\x08 \x01(\x0b\x32&.fennel.proto.dataset.PipelineLineages\x12+\n\x06pycode\x18\t \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\"\xb3\x05\n\x08Operator\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07is_root\x18\x02 \x01(\x08\x12\x15\n\rpipeline_name\x18\x03 \x01(\t\x12\x14\n\x0c\x64\x61taset_name\x18\x04 \x01(\t\x12\x34\n\taggregate\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.dataset.AggregateH\x00\x12*\n\x04join\x18\x06 \x01(\x0b\x32\x1a.fennel.proto.dataset.JoinH\x00\x12\x34\n\ttransform\x18\x07 \x01(\x0b\x32\x1f.fennel.proto.dataset.TransformH\x00\x12,\n\x05union\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.dataset.UnionH\x00\x12.\n\x06\x66ilter\x18\t \x01(\x0b\x32\x1c.fennel.proto.dataset.FilterH\x00\x12\x37\n\x0b\x64\x61taset_ref\x18\n \x01(\x0b\x32 .fennel.proto.dataset.DatasetRefH\x00\x12.\n\x06rename\x18\x0c \x01(\x0b\x32\x1c.fennel.proto.dataset.RenameH\x00\x12*\n\x04\x64rop\x18\r \x01(\x0b\x32\x1a.fennel.proto.dataset.DropH\x00\x12\x30\n\x07\x65xplode\x18\x0e \x01(\x0b\x32\x1d.fennel.proto.dataset.ExplodeH\x00\x12,\n\x05\x64\x65\x64up\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.dataset.DedupH\x00\x12,\n\x05\x66irst\x18\x10 \x01(\x0b\x32\x1b.fennel.proto.dataset.FirstH\x00\x12.\n\x06\x61ssign\x18\x11 \x01(\x0b\x32\x1c.fennel.proto.dataset.AssignH\x00\x12\x0c\n\x04name\x18\x0b \x01(\tB\x06\n\x04kind\"n\n\tAggregate\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12)\n\x05specs\x18\x03 \x03(\x0b\x32\x1a.fennel.proto.spec.PreSpec\x12\x14\n\x0coperand_name\x18\x04 \x01(\t\"\xa2\x03\n\x04Join\x12\x16\n\x0elhs_operand_id\x18\x01 \x01(\t\x12\x1c\n\x14rhs_dsref_operand_id\x18\x02 \x01(\t\x12.\n\x02on\x18\x03 \x03(\x0b\x32\".fennel.proto.dataset.Join.OnEntry\x12\x32\n\nwithin_low\x18\x06 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x33\n\x0bwithin_high\x18\x07 \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12\x18\n\x10lhs_operand_name\x18\x04 \x01(\t\x12\x1e\n\x16rhs_dsref_operand_name\x18\x05 \x01(\t\x12+\n\x03how\x18\x08 \x01(\x0e\x32\x1e.fennel.proto.dataset.Join.How\x1a)\n\x07OnEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1a\n\x03How\x12\x08\n\x04Left\x10\x00\x12\t\n\x05Inner\x10\x01\x42\r\n\x0b_within_lowB\x0e\n\x0c_within_high\"\xed\x01\n\tTransform\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12;\n\x06schema\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.Transform.SchemaEntry\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x14\n\x0coperand_name\x18\x04 \x01(\t\x1aL\n\x0bSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType:\x02\x38\x01\"_\n\x06\x46ilter\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12+\n\x06pycode\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xa8\x01\n\x06\x41ssign\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12+\n\x06pycode\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x13\n\x0b\x63olumn_name\x18\x03 \x01(\t\x12\x32\n\x0boutput_type\x18\x04 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x14\n\x0coperand_name\x18\x05 \x01(\t\"B\n\x04\x44rop\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x10\n\x08\x64ropcols\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xa5\x01\n\x06Rename\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12?\n\ncolumn_map\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.Rename.ColumnMapEntry\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\x1a\x30\n\x0e\x43olumnMapEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"3\n\x05Union\x12\x13\n\x0boperand_ids\x18\x01 \x03(\t\x12\x15\n\roperand_names\x18\x02 \x03(\t\"B\n\x05\x44\x65\x64up\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"D\n\x07\x45xplode\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"=\n\x05\x46irst\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\n\n\x02\x62y\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\",\n\nDatasetRef\x12\x1e\n\x16referring_dataset_name\x18\x01 \x01(\t\"\xf2\x01\n\x08\x44\x61taflow\x12\x16\n\x0c\x64\x61taset_name\x18\x01 \x01(\tH\x00\x12L\n\x11pipeline_dataflow\x18\x02 \x01(\x0b\x32/.fennel.proto.dataset.Dataflow.PipelineDataflowH\x00\x1ax\n\x10PipelineDataflow\x12\x14\n\x0c\x64\x61taset_name\x18\x01 \x01(\t\x12\x15\n\rpipeline_name\x18\x02 \x01(\t\x12\x37\n\x0finput_dataflows\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.dataset.DataflowB\x06\n\x04kind\"\x8e\x01\n\x10PipelineLineages\x12\x14\n\x0c\x64\x61taset_name\x18\x01 \x01(\t\x12\x15\n\rpipeline_name\x18\x02 \x01(\t\x12=\n\x0einput_datasets\x18\x03 \x03(\x0b\x32%.fennel.proto.dataset.DatasetLineages\x12\x0e\n\x06\x61\x63tive\x18\x04 \x01(\x08\"\\\n\x17\x44\x61tasetPipelineLineages\x12\x41\n\x11pipeline_lineages\x18\x02 \x03(\x0b\x32&.fennel.proto.dataset.PipelineLineages\"}\n\x0f\x44\x61tasetLineages\x12\x18\n\x0esource_dataset\x18\x01 \x01(\tH\x00\x12H\n\x0f\x64\x65rived_dataset\x18\x02 \x01(\x0b\x32-.fennel.proto.dataset.DatasetPipelineLineagesH\x00\x42\x06\n\x04kindb\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rdataset.proto\x12\x14\x66\x65nnel.proto.dataset\x1a\x0emetadata.proto\x1a\x0cpycode.proto\x1a\x0cschema.proto\x1a\nspec.proto\x1a\x1egoogle/protobuf/duration.proto\"\xc6\x04\n\x0b\x43oreDataset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12/\n\x08\x64sschema\x18\x03 \x01(\x0b\x32\x1d.fennel.proto.schema.DSSchema\x12*\n\x07history\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12,\n\tretention\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12L\n\x0e\x66ield_metadata\x18\x06 \x03(\x0b\x32\x34.fennel.proto.dataset.CoreDataset.FieldMetadataEntry\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x19\n\x11is_source_dataset\x18\t \x01(\x08\x12\x37\n\x08lineages\x18\x08 \x01(\x0b\x32%.fennel.proto.dataset.DatasetLineages\x12\x37\n\x0f\x61\x63tive_dataflow\x18\n \x01(\x0b\x32\x1e.fennel.proto.dataset.Dataflow\x12\x0c\n\x04tags\x18\x0b \x03(\t\x1aU\n\x12\x46ieldMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata:\x02\x38\x01\"Q\n\x08OnDemand\x12\x1c\n\x14\x66unction_source_code\x18\x01 \x01(\t\x12\x10\n\x08\x66unction\x18\x02 \x01(\x0c\x12\x15\n\rexpires_after\x18\x03 \x01(\x03\"\xa7\x02\n\x08Pipeline\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x14\n\x0c\x64\x61taset_name\x18\x02 \x01(\t\x12\x11\n\tsignature\x18\x03 \x01(\t\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x1b\n\x13input_dataset_names\x18\x05 \x03(\t\x12\x0f\n\x07version\x18\x06 \x01(\r\x12\x0e\n\x06\x61\x63tive\x18\x07 \x01(\x08\x12\x38\n\x08lineages\x18\x08 \x01(\x0b\x32&.fennel.proto.dataset.PipelineLineages\x12+\n\x06pycode\x18\t \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x0c\n\x04tags\x18\n \x03(\t\"\xe7\x05\n\x08Operator\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07is_root\x18\x02 \x01(\x08\x12\x15\n\rpipeline_name\x18\x03 \x01(\t\x12\x14\n\x0c\x64\x61taset_name\x18\x04 \x01(\t\x12\x34\n\taggregate\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.dataset.AggregateH\x00\x12*\n\x04join\x18\x06 \x01(\x0b\x32\x1a.fennel.proto.dataset.JoinH\x00\x12\x34\n\ttransform\x18\x07 \x01(\x0b\x32\x1f.fennel.proto.dataset.TransformH\x00\x12,\n\x05union\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.dataset.UnionH\x00\x12.\n\x06\x66ilter\x18\t \x01(\x0b\x32\x1c.fennel.proto.dataset.FilterH\x00\x12\x37\n\x0b\x64\x61taset_ref\x18\n \x01(\x0b\x32 .fennel.proto.dataset.DatasetRefH\x00\x12.\n\x06rename\x18\x0c \x01(\x0b\x32\x1c.fennel.proto.dataset.RenameH\x00\x12*\n\x04\x64rop\x18\r \x01(\x0b\x32\x1a.fennel.proto.dataset.DropH\x00\x12\x30\n\x07\x65xplode\x18\x0e \x01(\x0b\x32\x1d.fennel.proto.dataset.ExplodeH\x00\x12,\n\x05\x64\x65\x64up\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.dataset.DedupH\x00\x12,\n\x05\x66irst\x18\x10 \x01(\x0b\x32\x1b.fennel.proto.dataset.FirstH\x00\x12.\n\x06\x61ssign\x18\x11 \x01(\x0b\x32\x1c.fennel.proto.dataset.AssignH\x00\x12\x32\n\x08\x64ropnull\x18\x12 \x01(\x0b\x32\x1e.fennel.proto.dataset.DropnullH\x00\x12\x0c\n\x04name\x18\x0b \x01(\tB\x06\n\x04kind\"n\n\tAggregate\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12)\n\x05specs\x18\x03 \x03(\x0b\x32\x1a.fennel.proto.spec.PreSpec\x12\x14\n\x0coperand_name\x18\x04 \x01(\t\"\xa2\x03\n\x04Join\x12\x16\n\x0elhs_operand_id\x18\x01 \x01(\t\x12\x1c\n\x14rhs_dsref_operand_id\x18\x02 \x01(\t\x12.\n\x02on\x18\x03 \x03(\x0b\x32\".fennel.proto.dataset.Join.OnEntry\x12\x32\n\nwithin_low\x18\x06 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x33\n\x0bwithin_high\x18\x07 \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12\x18\n\x10lhs_operand_name\x18\x04 \x01(\t\x12\x1e\n\x16rhs_dsref_operand_name\x18\x05 \x01(\t\x12+\n\x03how\x18\x08 \x01(\x0e\x32\x1e.fennel.proto.dataset.Join.How\x1a)\n\x07OnEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1a\n\x03How\x12\x08\n\x04Left\x10\x00\x12\t\n\x05Inner\x10\x01\x42\r\n\x0b_within_lowB\x0e\n\x0c_within_high\"\xed\x01\n\tTransform\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12;\n\x06schema\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.Transform.SchemaEntry\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x14\n\x0coperand_name\x18\x04 \x01(\t\x1aL\n\x0bSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType:\x02\x38\x01\"_\n\x06\x46ilter\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12+\n\x06pycode\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xa8\x01\n\x06\x41ssign\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12+\n\x06pycode\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x13\n\x0b\x63olumn_name\x18\x03 \x01(\t\x12\x32\n\x0boutput_type\x18\x04 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x14\n\x0coperand_name\x18\x05 \x01(\t\"E\n\x08\x44ropnull\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"B\n\x04\x44rop\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x10\n\x08\x64ropcols\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xa5\x01\n\x06Rename\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12?\n\ncolumn_map\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.Rename.ColumnMapEntry\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\x1a\x30\n\x0e\x43olumnMapEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"3\n\x05Union\x12\x13\n\x0boperand_ids\x18\x01 \x03(\t\x12\x15\n\roperand_names\x18\x02 \x03(\t\"B\n\x05\x44\x65\x64up\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"D\n\x07\x45xplode\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"=\n\x05\x46irst\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\n\n\x02\x62y\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\",\n\nDatasetRef\x12\x1e\n\x16referring_dataset_name\x18\x01 \x01(\t\"\x80\x02\n\x08\x44\x61taflow\x12\x16\n\x0c\x64\x61taset_name\x18\x01 \x01(\tH\x00\x12L\n\x11pipeline_dataflow\x18\x02 \x01(\x0b\x32/.fennel.proto.dataset.Dataflow.PipelineDataflowH\x00\x12\x0c\n\x04tags\x18\x03 \x03(\t\x1ax\n\x10PipelineDataflow\x12\x14\n\x0c\x64\x61taset_name\x18\x01 \x01(\t\x12\x15\n\rpipeline_name\x18\x02 \x01(\t\x12\x37\n\x0finput_dataflows\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.dataset.DataflowB\x06\n\x04kind\"\x9c\x01\n\x10PipelineLineages\x12\x14\n\x0c\x64\x61taset_name\x18\x01 \x01(\t\x12\x15\n\rpipeline_name\x18\x02 \x01(\t\x12=\n\x0einput_datasets\x18\x03 \x03(\x0b\x32%.fennel.proto.dataset.DatasetLineages\x12\x0e\n\x06\x61\x63tive\x18\x04 \x01(\x08\x12\x0c\n\x04tags\x18\x05 \x03(\t\"\\\n\x17\x44\x61tasetPipelineLineages\x12\x41\n\x11pipeline_lineages\x18\x02 \x03(\x0b\x32&.fennel.proto.dataset.PipelineLineages\"\x8b\x01\n\x0f\x44\x61tasetLineages\x12\x18\n\x0esource_dataset\x18\x01 \x01(\tH\x00\x12H\n\x0f\x64\x65rived_dataset\x18\x02 \x01(\x0b\x32-.fennel.proto.dataset.DatasetPipelineLineagesH\x00\x12\x0c\n\x04tags\x18\x03 \x03(\tB\x06\n\x04kindb\x06proto3')
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -34,55 +34,57 @@
   _RENAME_COLUMNMAPENTRY._options = None
   _RENAME_COLUMNMAPENTRY._serialized_options = b'8\001'
   _globals['_COREDATASET']._serialized_start=128
-  _globals['_COREDATASET']._serialized_end=696
-  _globals['_COREDATASET_FIELDMETADATAENTRY']._serialized_start=611
-  _globals['_COREDATASET_FIELDMETADATAENTRY']._serialized_end=696
-  _globals['_ONDEMAND']._serialized_start=698
-  _globals['_ONDEMAND']._serialized_end=779
-  _globals['_PIPELINE']._serialized_start=782
-  _globals['_PIPELINE']._serialized_end=1063
-  _globals['_OPERATOR']._serialized_start=1066
-  _globals['_OPERATOR']._serialized_end=1757
-  _globals['_AGGREGATE']._serialized_start=1759
-  _globals['_AGGREGATE']._serialized_end=1869
-  _globals['_JOIN']._serialized_start=1872
-  _globals['_JOIN']._serialized_end=2290
-  _globals['_JOIN_ONENTRY']._serialized_start=2190
-  _globals['_JOIN_ONENTRY']._serialized_end=2231
-  _globals['_JOIN_HOW']._serialized_start=2233
-  _globals['_JOIN_HOW']._serialized_end=2259
-  _globals['_TRANSFORM']._serialized_start=2293
-  _globals['_TRANSFORM']._serialized_end=2530
-  _globals['_TRANSFORM_SCHEMAENTRY']._serialized_start=2454
-  _globals['_TRANSFORM_SCHEMAENTRY']._serialized_end=2530
-  _globals['_FILTER']._serialized_start=2532
-  _globals['_FILTER']._serialized_end=2627
-  _globals['_ASSIGN']._serialized_start=2630
-  _globals['_ASSIGN']._serialized_end=2798
-  _globals['_DROP']._serialized_start=2800
-  _globals['_DROP']._serialized_end=2866
-  _globals['_RENAME']._serialized_start=2869
-  _globals['_RENAME']._serialized_end=3034
-  _globals['_RENAME_COLUMNMAPENTRY']._serialized_start=2986
-  _globals['_RENAME_COLUMNMAPENTRY']._serialized_end=3034
-  _globals['_UNION']._serialized_start=3036
-  _globals['_UNION']._serialized_end=3087
-  _globals['_DEDUP']._serialized_start=3089
-  _globals['_DEDUP']._serialized_end=3155
-  _globals['_EXPLODE']._serialized_start=3157
-  _globals['_EXPLODE']._serialized_end=3225
-  _globals['_FIRST']._serialized_start=3227
-  _globals['_FIRST']._serialized_end=3288
-  _globals['_DATASETREF']._serialized_start=3290
-  _globals['_DATASETREF']._serialized_end=3334
-  _globals['_DATAFLOW']._serialized_start=3337
-  _globals['_DATAFLOW']._serialized_end=3579
-  _globals['_DATAFLOW_PIPELINEDATAFLOW']._serialized_start=3451
-  _globals['_DATAFLOW_PIPELINEDATAFLOW']._serialized_end=3571
-  _globals['_PIPELINELINEAGES']._serialized_start=3582
-  _globals['_PIPELINELINEAGES']._serialized_end=3724
-  _globals['_DATASETPIPELINELINEAGES']._serialized_start=3726
-  _globals['_DATASETPIPELINELINEAGES']._serialized_end=3818
-  _globals['_DATASETLINEAGES']._serialized_start=3820
-  _globals['_DATASETLINEAGES']._serialized_end=3945
+  _globals['_COREDATASET']._serialized_end=710
+  _globals['_COREDATASET_FIELDMETADATAENTRY']._serialized_start=625
+  _globals['_COREDATASET_FIELDMETADATAENTRY']._serialized_end=710
+  _globals['_ONDEMAND']._serialized_start=712
+  _globals['_ONDEMAND']._serialized_end=793
+  _globals['_PIPELINE']._serialized_start=796
+  _globals['_PIPELINE']._serialized_end=1091
+  _globals['_OPERATOR']._serialized_start=1094
+  _globals['_OPERATOR']._serialized_end=1837
+  _globals['_AGGREGATE']._serialized_start=1839
+  _globals['_AGGREGATE']._serialized_end=1949
+  _globals['_JOIN']._serialized_start=1952
+  _globals['_JOIN']._serialized_end=2370
+  _globals['_JOIN_ONENTRY']._serialized_start=2270
+  _globals['_JOIN_ONENTRY']._serialized_end=2311
+  _globals['_JOIN_HOW']._serialized_start=2313
+  _globals['_JOIN_HOW']._serialized_end=2339
+  _globals['_TRANSFORM']._serialized_start=2373
+  _globals['_TRANSFORM']._serialized_end=2610
+  _globals['_TRANSFORM_SCHEMAENTRY']._serialized_start=2534
+  _globals['_TRANSFORM_SCHEMAENTRY']._serialized_end=2610
+  _globals['_FILTER']._serialized_start=2612
+  _globals['_FILTER']._serialized_end=2707
+  _globals['_ASSIGN']._serialized_start=2710
+  _globals['_ASSIGN']._serialized_end=2878
+  _globals['_DROPNULL']._serialized_start=2880
+  _globals['_DROPNULL']._serialized_end=2949
+  _globals['_DROP']._serialized_start=2951
+  _globals['_DROP']._serialized_end=3017
+  _globals['_RENAME']._serialized_start=3020
+  _globals['_RENAME']._serialized_end=3185
+  _globals['_RENAME_COLUMNMAPENTRY']._serialized_start=3137
+  _globals['_RENAME_COLUMNMAPENTRY']._serialized_end=3185
+  _globals['_UNION']._serialized_start=3187
+  _globals['_UNION']._serialized_end=3238
+  _globals['_DEDUP']._serialized_start=3240
+  _globals['_DEDUP']._serialized_end=3306
+  _globals['_EXPLODE']._serialized_start=3308
+  _globals['_EXPLODE']._serialized_end=3376
+  _globals['_FIRST']._serialized_start=3378
+  _globals['_FIRST']._serialized_end=3439
+  _globals['_DATASETREF']._serialized_start=3441
+  _globals['_DATASETREF']._serialized_end=3485
+  _globals['_DATAFLOW']._serialized_start=3488
+  _globals['_DATAFLOW']._serialized_end=3744
+  _globals['_DATAFLOW_PIPELINEDATAFLOW']._serialized_start=3616
+  _globals['_DATAFLOW_PIPELINEDATAFLOW']._serialized_end=3736
+  _globals['_PIPELINELINEAGES']._serialized_start=3747
+  _globals['_PIPELINELINEAGES']._serialized_end=3903
+  _globals['_DATASETPIPELINELINEAGES']._serialized_start=3905
+  _globals['_DATASETPIPELINELINEAGES']._serialized_end=3997
+  _globals['_DATASETLINEAGES']._serialized_start=4000
+  _globals['_DATASETLINEAGES']._serialized_end=4139
 # @@protoc_insertion_point(module_scope)
diff --git a/fennel/gen/dataset_pb2.pyi b/fennel/gen/dataset_pb2.pyi
index 0dadf5f21..a76d71fb0 100644
--- a/fennel/gen/dataset_pb2.pyi
+++ b/fennel/gen/dataset_pb2.pyi
@@ -55,6 +55,7 @@ class CoreDataset(google.protobuf.message.Message):
     IS_SOURCE_DATASET_FIELD_NUMBER: builtins.int
     LINEAGES_FIELD_NUMBER: builtins.int
     ACTIVE_DATAFLOW_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     name: builtins.str
     @property
     def metadata(self) -> metadata_pb2.Metadata: ...
@@ -76,6 +77,9 @@ class CoreDataset(google.protobuf.message.Message):
         """
     @property
     def active_dataflow(self) -> global___Dataflow: ...
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+        """Union of all tags based on ownership and dataflow semantics."""
     def __init__(
         self,
         *,
@@ -89,9 +93,10 @@ class CoreDataset(google.protobuf.message.Message):
         is_source_dataset: builtins.bool = ...,
         lineages: global___DatasetLineages | None = ...,
         active_dataflow: global___Dataflow | None = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["active_dataflow", b"active_dataflow", "dsschema", b"dsschema", "history", b"history", "lineages", b"lineages", "metadata", b"metadata", "pycode", b"pycode", "retention", b"retention"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["active_dataflow", b"active_dataflow", "dsschema", b"dsschema", "field_metadata", b"field_metadata", "history", b"history", "is_source_dataset", b"is_source_dataset", "lineages", b"lineages", "metadata", b"metadata", "name", b"name", "pycode", b"pycode", "retention", b"retention"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["active_dataflow", b"active_dataflow", "dsschema", b"dsschema", "field_metadata", b"field_metadata", "history", b"history", "is_source_dataset", b"is_source_dataset", "lineages", b"lineages", "metadata", b"metadata", "name", b"name", "pycode", b"pycode", "retention", b"retention", "tags", b"tags"]) -> None: ...
 
 global___CoreDataset = CoreDataset
 
@@ -137,6 +142,7 @@ class Pipeline(google.protobuf.message.Message):
     ACTIVE_FIELD_NUMBER: builtins.int
     LINEAGES_FIELD_NUMBER: builtins.int
     PYCODE_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     name: builtins.str
     dataset_name: builtins.str
     signature: builtins.str
@@ -153,6 +159,9 @@ class Pipeline(google.protobuf.message.Message):
         """
     @property
     def pycode(self) -> pycode_pb2.PyCode: ...
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+        """Tags that only belong to this pipeline."""
     def __init__(
         self,
         *,
@@ -165,9 +174,10 @@ class Pipeline(google.protobuf.message.Message):
         active: builtins.bool = ...,
         lineages: global___PipelineLineages | None = ...,
         pycode: pycode_pb2.PyCode | None = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["lineages", b"lineages", "metadata", b"metadata", "pycode", b"pycode"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["active", b"active", "dataset_name", b"dataset_name", "input_dataset_names", b"input_dataset_names", "lineages", b"lineages", "metadata", b"metadata", "name", b"name", "pycode", b"pycode", "signature", b"signature", "version", b"version"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["active", b"active", "dataset_name", b"dataset_name", "input_dataset_names", b"input_dataset_names", "lineages", b"lineages", "metadata", b"metadata", "name", b"name", "pycode", b"pycode", "signature", b"signature", "tags", b"tags", "version", b"version"]) -> None: ...
 
 global___Pipeline = Pipeline
 
@@ -193,6 +203,7 @@ class Operator(google.protobuf.message.Message):
     DEDUP_FIELD_NUMBER: builtins.int
     FIRST_FIELD_NUMBER: builtins.int
     ASSIGN_FIELD_NUMBER: builtins.int
+    DROPNULL_FIELD_NUMBER: builtins.int
     NAME_FIELD_NUMBER: builtins.int
     id: builtins.str
     """Every operator has an ID assigned by the client"""
@@ -228,6 +239,8 @@ class Operator(google.protobuf.message.Message):
     def first(self) -> global___First: ...
     @property
     def assign(self) -> global___Assign: ...
+    @property
+    def dropnull(self) -> global___Dropnull: ...
     name: builtins.str
     """NOTE: FOLLOWING PROPERTIES ARE SET BY THE SERVER AND WILL BE IGNORED BY
     THE CLIENT
@@ -253,11 +266,12 @@ class Operator(google.protobuf.message.Message):
         dedup: global___Dedup | None = ...,
         first: global___First | None = ...,
         assign: global___Assign | None = ...,
+        dropnull: global___Dropnull | None = ...,
         name: builtins.str = ...,
     ) -> None: ...
-    def HasField(self, field_name: typing_extensions.Literal["aggregate", b"aggregate", "assign", b"assign", "dataset_ref", b"dataset_ref", "dedup", b"dedup", "drop", b"drop", "explode", b"explode", "filter", b"filter", "first", b"first", "join", b"join", "kind", b"kind", "rename", b"rename", "transform", b"transform", "union", b"union"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["aggregate", b"aggregate", "assign", b"assign", "dataset_name", b"dataset_name", "dataset_ref", b"dataset_ref", "dedup", b"dedup", "drop", b"drop", "explode", b"explode", "filter", b"filter", "first", b"first", "id", b"id", "is_root", b"is_root", "join", b"join", "kind", b"kind", "name", b"name", "pipeline_name", b"pipeline_name", "rename", b"rename", "transform", b"transform", "union", b"union"]) -> None: ...
-    def WhichOneof(self, oneof_group: typing_extensions.Literal["kind", b"kind"]) -> typing_extensions.Literal["aggregate", "join", "transform", "union", "filter", "dataset_ref", "rename", "drop", "explode", "dedup", "first", "assign"] | None: ...
+    def HasField(self, field_name: typing_extensions.Literal["aggregate", b"aggregate", "assign", b"assign", "dataset_ref", b"dataset_ref", "dedup", b"dedup", "drop", b"drop", "dropnull", b"dropnull", "explode", b"explode", "filter", b"filter", "first", b"first", "join", b"join", "kind", b"kind", "rename", b"rename", "transform", b"transform", "union", b"union"]) -> builtins.bool: ...
+    def ClearField(self, field_name: typing_extensions.Literal["aggregate", b"aggregate", "assign", b"assign", "dataset_name", b"dataset_name", "dataset_ref", b"dataset_ref", "dedup", b"dedup", "drop", b"drop", "dropnull", b"dropnull", "explode", b"explode", "filter", b"filter", "first", b"first", "id", b"id", "is_root", b"is_root", "join", b"join", "kind", b"kind", "name", b"name", "pipeline_name", b"pipeline_name", "rename", b"rename", "transform", b"transform", "union", b"union"]) -> None: ...
+    def WhichOneof(self, oneof_group: typing_extensions.Literal["kind", b"kind"]) -> typing_extensions.Literal["aggregate", "join", "transform", "union", "filter", "dataset_ref", "rename", "drop", "explode", "dedup", "first", "assign", "dropnull"] | None: ...
 
 global___Operator = Operator
 
@@ -475,6 +489,31 @@ class Assign(google.protobuf.message.Message):
 
 global___Assign = Assign
 
+@typing_extensions.final
+class Dropnull(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    OPERAND_ID_FIELD_NUMBER: builtins.int
+    COLUMNS_FIELD_NUMBER: builtins.int
+    OPERAND_NAME_FIELD_NUMBER: builtins.int
+    operand_id: builtins.str
+    @property
+    def columns(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
+    operand_name: builtins.str
+    """NOTE: FOLLOWING PROPERTIES ARE SET BY THE SERVER AND WILL BE IGNORED BY
+    THE CLIENT
+    """
+    def __init__(
+        self,
+        *,
+        operand_id: builtins.str = ...,
+        columns: collections.abc.Iterable[builtins.str] | None = ...,
+        operand_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["columns", b"columns", "operand_id", b"operand_id", "operand_name", b"operand_name"]) -> None: ...
+
+global___Dropnull = Dropnull
+
 @typing_extensions.final
 class Drop(google.protobuf.message.Message):
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -685,17 +724,21 @@ class Dataflow(google.protobuf.message.Message):
 
     DATASET_NAME_FIELD_NUMBER: builtins.int
     PIPELINE_DATAFLOW_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     dataset_name: builtins.str
     @property
     def pipeline_dataflow(self) -> global___Dataflow.PipelineDataflow: ...
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
     def __init__(
         self,
         *,
         dataset_name: builtins.str = ...,
         pipeline_dataflow: global___Dataflow.PipelineDataflow | None = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["dataset_name", b"dataset_name", "kind", b"kind", "pipeline_dataflow", b"pipeline_dataflow"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["dataset_name", b"dataset_name", "kind", b"kind", "pipeline_dataflow", b"pipeline_dataflow"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["dataset_name", b"dataset_name", "kind", b"kind", "pipeline_dataflow", b"pipeline_dataflow", "tags", b"tags"]) -> None: ...
     def WhichOneof(self, oneof_group: typing_extensions.Literal["kind", b"kind"]) -> typing_extensions.Literal["dataset_name", "pipeline_dataflow"] | None: ...
 
 global___Dataflow = Dataflow
@@ -708,11 +751,14 @@ class PipelineLineages(google.protobuf.message.Message):
     PIPELINE_NAME_FIELD_NUMBER: builtins.int
     INPUT_DATASETS_FIELD_NUMBER: builtins.int
     ACTIVE_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     dataset_name: builtins.str
     pipeline_name: builtins.str
     @property
     def input_datasets(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___DatasetLineages]: ...
     active: builtins.bool
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
     def __init__(
         self,
         *,
@@ -720,8 +766,9 @@ class PipelineLineages(google.protobuf.message.Message):
         pipeline_name: builtins.str = ...,
         input_datasets: collections.abc.Iterable[global___DatasetLineages] | None = ...,
         active: builtins.bool = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
-    def ClearField(self, field_name: typing_extensions.Literal["active", b"active", "dataset_name", b"dataset_name", "input_datasets", b"input_datasets", "pipeline_name", b"pipeline_name"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["active", b"active", "dataset_name", b"dataset_name", "input_datasets", b"input_datasets", "pipeline_name", b"pipeline_name", "tags", b"tags"]) -> None: ...
 
 global___PipelineLineages = PipelineLineages
 
@@ -747,6 +794,7 @@ class DatasetLineages(google.protobuf.message.Message):
 
     SOURCE_DATASET_FIELD_NUMBER: builtins.int
     DERIVED_DATASET_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     source_dataset: builtins.str
     """If it is a source dataset, it will have a source dataset name."""
     @property
@@ -754,14 +802,17 @@ class DatasetLineages(google.protobuf.message.Message):
         """If it is a derived dataset, it will have pipeline lineages, one for each
         pipeline in the dataset.
         """
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
     def __init__(
         self,
         *,
         source_dataset: builtins.str = ...,
         derived_dataset: global___DatasetPipelineLineages | None = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["derived_dataset", b"derived_dataset", "kind", b"kind", "source_dataset", b"source_dataset"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["derived_dataset", b"derived_dataset", "kind", b"kind", "source_dataset", b"source_dataset"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["derived_dataset", b"derived_dataset", "kind", b"kind", "source_dataset", b"source_dataset", "tags", b"tags"]) -> None: ...
     def WhichOneof(self, oneof_group: typing_extensions.Literal["kind", b"kind"]) -> typing_extensions.Literal["source_dataset", "derived_dataset"] | None: ...
 
 global___DatasetLineages = DatasetLineages
diff --git a/fennel/gen/expectations_pb2.py b/fennel/gen/expectations_pb2.py
index 5ef34c29d..864b2c4cc 100644
--- a/fennel/gen/expectations_pb2.py
+++ b/fennel/gen/expectations_pb2.py
@@ -14,7 +14,7 @@
 import fennel.gen.metadata_pb2 as metadata__pb2
 
 
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x65xpectations.proto\x12\x19\x66\x65nnel.proto.expectations\x1a\x0emetadata.proto\"\xb1\x02\n\x0c\x45xpectations\x12\x13\n\x0b\x65ntity_name\x18\x01 \x01(\t\x12\r\n\x05suite\x18\x02 \x01(\t\x12<\n\x0c\x65xpectations\x18\x03 \x03(\x0b\x32&.fennel.proto.expectations.Expectation\x12\x0f\n\x07version\x18\x04 \x01(\x05\x12\x31\n\x08metadata\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x42\n\x06\x65_type\x18\x06 \x01(\x0e\x32\x32.fennel.proto.expectations.Expectations.EntityType\"7\n\nEntityType\x12\x0b\n\x07\x44\x61taset\x10\x00\x12\x0e\n\nFeatureset\x10\x01\x12\x0c\n\x08Pipeline\x10\x02\"C\n\x0b\x45xpectation\x12\x18\n\x10\x65xpectation_type\x18\x01 \x01(\t\x12\x1a\n\x12\x65xpectation_kwargs\x18\x02 \x01(\tb\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x65xpectations.proto\x12\x19\x66\x65nnel.proto.expectations\x1a\x0emetadata.proto\"\xbf\x02\n\x0c\x45xpectations\x12\x13\n\x0b\x65ntity_name\x18\x01 \x01(\t\x12\r\n\x05suite\x18\x02 \x01(\t\x12<\n\x0c\x65xpectations\x18\x03 \x03(\x0b\x32&.fennel.proto.expectations.Expectation\x12\x0f\n\x07version\x18\x04 \x01(\x05\x12\x31\n\x08metadata\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x42\n\x06\x65_type\x18\x06 \x01(\x0e\x32\x32.fennel.proto.expectations.Expectations.EntityType\x12\x0c\n\x04tags\x18\x07 \x03(\t\"7\n\nEntityType\x12\x0b\n\x07\x44\x61taset\x10\x00\x12\x0e\n\nFeatureset\x10\x01\x12\x0c\n\x08Pipeline\x10\x02\"C\n\x0b\x45xpectation\x12\x18\n\x10\x65xpectation_type\x18\x01 \x01(\t\x12\x1a\n\x12\x65xpectation_kwargs\x18\x02 \x01(\tb\x06proto3')
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -22,9 +22,9 @@
 if _descriptor._USE_C_DESCRIPTORS == False:
   DESCRIPTOR._options = None
   _globals['_EXPECTATIONS']._serialized_start=66
-  _globals['_EXPECTATIONS']._serialized_end=371
-  _globals['_EXPECTATIONS_ENTITYTYPE']._serialized_start=316
-  _globals['_EXPECTATIONS_ENTITYTYPE']._serialized_end=371
-  _globals['_EXPECTATION']._serialized_start=373
-  _globals['_EXPECTATION']._serialized_end=440
+  _globals['_EXPECTATIONS']._serialized_end=385
+  _globals['_EXPECTATIONS_ENTITYTYPE']._serialized_start=330
+  _globals['_EXPECTATIONS_ENTITYTYPE']._serialized_end=385
+  _globals['_EXPECTATION']._serialized_start=387
+  _globals['_EXPECTATION']._serialized_end=454
 # @@protoc_insertion_point(module_scope)
diff --git a/fennel/gen/expectations_pb2.pyi b/fennel/gen/expectations_pb2.pyi
index e94138a1c..49e779478 100644
--- a/fennel/gen/expectations_pb2.pyi
+++ b/fennel/gen/expectations_pb2.pyi
@@ -44,6 +44,7 @@ class Expectations(google.protobuf.message.Message):
     VERSION_FIELD_NUMBER: builtins.int
     METADATA_FIELD_NUMBER: builtins.int
     E_TYPE_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     entity_name: builtins.str
     suite: builtins.str
     @property
@@ -52,6 +53,8 @@ class Expectations(google.protobuf.message.Message):
     @property
     def metadata(self) -> metadata_pb2.Metadata: ...
     e_type: global___Expectations.EntityType.ValueType
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
     def __init__(
         self,
         *,
@@ -61,9 +64,10 @@ class Expectations(google.protobuf.message.Message):
         version: builtins.int = ...,
         metadata: metadata_pb2.Metadata | None = ...,
         e_type: global___Expectations.EntityType.ValueType = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["metadata", b"metadata"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["e_type", b"e_type", "entity_name", b"entity_name", "expectations", b"expectations", "metadata", b"metadata", "suite", b"suite", "version", b"version"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["e_type", b"e_type", "entity_name", b"entity_name", "expectations", b"expectations", "metadata", b"metadata", "suite", b"suite", "tags", b"tags", "version", b"version"]) -> None: ...
 
 global___Expectations = Expectations
 
diff --git a/fennel/gen/featureset_pb2.py b/fennel/gen/featureset_pb2.py
index 0cb0eeb52..5728c1c73 100644
--- a/fennel/gen/featureset_pb2.py
+++ b/fennel/gen/featureset_pb2.py
@@ -16,27 +16,27 @@
 import fennel.gen.pycode_pb2 as pycode__pb2
 
 
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x66\x65\x61tureset.proto\x12\x17\x66\x65nnel.proto.featureset\x1a\x0emetadata.proto\x1a\x0cschema.proto\x1a\x0cpycode.proto\"~\n\x0e\x43oreFeatureset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\"\x9e\x01\n\x07\x46\x65\x61ture\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12,\n\x05\x64type\x18\x03 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x05 \x01(\t\"S\n\x0f\x46ieldLookupInfo\x12)\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x1a.fennel.proto.schema.Field\x12\x15\n\rdefault_value\x18\x03 \x01(\t\"\x92\x03\n\tExtractor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tasets\x18\x02 \x03(\t\x12.\n\x06inputs\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.featureset.Input\x12\x10\n\x08\x66\x65\x61tures\x18\x04 \x03(\t\x12\x31\n\x08metadata\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x0f\n\x07version\x18\x06 \x01(\x05\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x08 \x01(\t\x12>\n\x0e\x65xtractor_type\x18\t \x01(\x0e\x32&.fennel.proto.featureset.ExtractorType\x12>\n\nfield_info\x18\n \x01(\x0b\x32(.fennel.proto.featureset.FieldLookupInfoH\x00\x42\x18\n\x16\x64\x65rived_extractor_info\"s\n\x05Input\x12\x37\n\x07\x66\x65\x61ture\x18\x01 \x01(\x0b\x32&.fennel.proto.featureset.Input.Feature\x1a\x31\n\x07\x46\x65\x61ture\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\"\xad\x01\n\x05Model\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x30\n\x06inputs\x18\x02 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x07outputs\x18\x03 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata*3\n\rExtractorType\x12\x0b\n\x07PY_FUNC\x10\x00\x12\n\n\x06LOOKUP\x10\x01\x12\t\n\x05\x41LIAS\x10\x02\x62\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x66\x65\x61tureset.proto\x12\x17\x66\x65nnel.proto.featureset\x1a\x0emetadata.proto\x1a\x0cschema.proto\x1a\x0cpycode.proto\"\x8c\x01\n\x0e\x43oreFeatureset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x0c\n\x04tags\x18\x04 \x03(\t\"\xac\x01\n\x07\x46\x65\x61ture\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12,\n\x05\x64type\x18\x03 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x05 \x01(\t\x12\x0c\n\x04tags\x18\x06 \x03(\t\"S\n\x0f\x46ieldLookupInfo\x12)\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x1a.fennel.proto.schema.Field\x12\x15\n\rdefault_value\x18\x03 \x01(\t\"\xa0\x03\n\tExtractor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tasets\x18\x02 \x03(\t\x12.\n\x06inputs\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.featureset.Input\x12\x10\n\x08\x66\x65\x61tures\x18\x04 \x03(\t\x12\x31\n\x08metadata\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x0f\n\x07version\x18\x06 \x01(\x05\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x08 \x01(\t\x12>\n\x0e\x65xtractor_type\x18\t \x01(\x0e\x32&.fennel.proto.featureset.ExtractorType\x12>\n\nfield_info\x18\n \x01(\x0b\x32(.fennel.proto.featureset.FieldLookupInfoH\x00\x12\x0c\n\x04tags\x18\x0b \x03(\tB\x18\n\x16\x64\x65rived_extractor_info\"s\n\x05Input\x12\x37\n\x07\x66\x65\x61ture\x18\x01 \x01(\x0b\x32&.fennel.proto.featureset.Input.Feature\x1a\x31\n\x07\x46\x65\x61ture\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\"\xad\x01\n\x05Model\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x30\n\x06inputs\x18\x02 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x07outputs\x18\x03 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata*3\n\rExtractorType\x12\x0b\n\x07PY_FUNC\x10\x00\x12\n\n\x06LOOKUP\x10\x01\x12\t\n\x05\x41LIAS\x10\x02\x62\x06proto3')
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
 _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'featureset_pb2', _globals)
 if _descriptor._USE_C_DESCRIPTORS == False:
   DESCRIPTOR._options = None
-  _globals['_EXTRACTORTYPE']._serialized_start=1161
-  _globals['_EXTRACTORTYPE']._serialized_end=1212
-  _globals['_COREFEATURESET']._serialized_start=89
-  _globals['_COREFEATURESET']._serialized_end=215
-  _globals['_FEATURE']._serialized_start=218
-  _globals['_FEATURE']._serialized_end=376
-  _globals['_FIELDLOOKUPINFO']._serialized_start=378
-  _globals['_FIELDLOOKUPINFO']._serialized_end=461
-  _globals['_EXTRACTOR']._serialized_start=464
-  _globals['_EXTRACTOR']._serialized_end=866
-  _globals['_INPUT']._serialized_start=868
-  _globals['_INPUT']._serialized_end=983
-  _globals['_INPUT_FEATURE']._serialized_start=934
-  _globals['_INPUT_FEATURE']._serialized_end=983
-  _globals['_MODEL']._serialized_start=986
-  _globals['_MODEL']._serialized_end=1159
+  _globals['_EXTRACTORTYPE']._serialized_start=1204
+  _globals['_EXTRACTORTYPE']._serialized_end=1255
+  _globals['_COREFEATURESET']._serialized_start=90
+  _globals['_COREFEATURESET']._serialized_end=230
+  _globals['_FEATURE']._serialized_start=233
+  _globals['_FEATURE']._serialized_end=405
+  _globals['_FIELDLOOKUPINFO']._serialized_start=407
+  _globals['_FIELDLOOKUPINFO']._serialized_end=490
+  _globals['_EXTRACTOR']._serialized_start=493
+  _globals['_EXTRACTOR']._serialized_end=909
+  _globals['_INPUT']._serialized_start=911
+  _globals['_INPUT']._serialized_end=1026
+  _globals['_INPUT_FEATURE']._serialized_start=977
+  _globals['_INPUT_FEATURE']._serialized_end=1026
+  _globals['_MODEL']._serialized_start=1029
+  _globals['_MODEL']._serialized_end=1202
 # @@protoc_insertion_point(module_scope)
diff --git a/fennel/gen/featureset_pb2.pyi b/fennel/gen/featureset_pb2.pyi
index b8db2e6b1..4525da808 100644
--- a/fennel/gen/featureset_pb2.pyi
+++ b/fennel/gen/featureset_pb2.pyi
@@ -47,20 +47,24 @@ class CoreFeatureset(google.protobuf.message.Message):
     NAME_FIELD_NUMBER: builtins.int
     METADATA_FIELD_NUMBER: builtins.int
     PYCODE_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     name: builtins.str
     @property
     def metadata(self) -> metadata_pb2.Metadata: ...
     @property
     def pycode(self) -> pycode_pb2.PyCode: ...
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
     def __init__(
         self,
         *,
         name: builtins.str = ...,
         metadata: metadata_pb2.Metadata | None = ...,
         pycode: pycode_pb2.PyCode | None = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["metadata", b"metadata", "pycode", b"pycode"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["metadata", b"metadata", "name", b"name", "pycode", b"pycode"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["metadata", b"metadata", "name", b"name", "pycode", b"pycode", "tags", b"tags"]) -> None: ...
 
 global___CoreFeatureset = CoreFeatureset
 
@@ -73,6 +77,7 @@ class Feature(google.protobuf.message.Message):
     DTYPE_FIELD_NUMBER: builtins.int
     METADATA_FIELD_NUMBER: builtins.int
     FEATURE_SET_NAME_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     id: builtins.int
     name: builtins.str
     @property
@@ -80,6 +85,8 @@ class Feature(google.protobuf.message.Message):
     @property
     def metadata(self) -> metadata_pb2.Metadata: ...
     feature_set_name: builtins.str
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
     def __init__(
         self,
         *,
@@ -88,9 +95,10 @@ class Feature(google.protobuf.message.Message):
         dtype: schema_pb2.DataType | None = ...,
         metadata: metadata_pb2.Metadata | None = ...,
         feature_set_name: builtins.str = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["dtype", b"dtype", "metadata", b"metadata"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["dtype", b"dtype", "feature_set_name", b"feature_set_name", "id", b"id", "metadata", b"metadata", "name", b"name"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["dtype", b"dtype", "feature_set_name", b"feature_set_name", "id", b"id", "metadata", b"metadata", "name", b"name", "tags", b"tags"]) -> None: ...
 
 global___Feature = Feature
 
@@ -129,6 +137,7 @@ class Extractor(google.protobuf.message.Message):
     FEATURE_SET_NAME_FIELD_NUMBER: builtins.int
     EXTRACTOR_TYPE_FIELD_NUMBER: builtins.int
     FIELD_INFO_FIELD_NUMBER: builtins.int
+    TAGS_FIELD_NUMBER: builtins.int
     name: builtins.str
     @property
     def datasets(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
@@ -150,6 +159,8 @@ class Extractor(google.protobuf.message.Message):
         """pycode excluded from the oneof for better bwd compatibility in Rust
         required iff extractor_type == LOOKUP
         """
+    @property
+    def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
     def __init__(
         self,
         *,
@@ -163,9 +174,10 @@ class Extractor(google.protobuf.message.Message):
         feature_set_name: builtins.str = ...,
         extractor_type: global___ExtractorType.ValueType = ...,
         field_info: global___FieldLookupInfo | None = ...,
+        tags: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["derived_extractor_info", b"derived_extractor_info", "field_info", b"field_info", "metadata", b"metadata", "pycode", b"pycode"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["datasets", b"datasets", "derived_extractor_info", b"derived_extractor_info", "extractor_type", b"extractor_type", "feature_set_name", b"feature_set_name", "features", b"features", "field_info", b"field_info", "inputs", b"inputs", "metadata", b"metadata", "name", b"name", "pycode", b"pycode", "version", b"version"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["datasets", b"datasets", "derived_extractor_info", b"derived_extractor_info", "extractor_type", b"extractor_type", "feature_set_name", b"feature_set_name", "features", b"features", "field_info", b"field_info", "inputs", b"inputs", "metadata", b"metadata", "name", b"name", "pycode", b"pycode", "tags", b"tags", "version", b"version"]) -> None: ...
     def WhichOneof(self, oneof_group: typing_extensions.Literal["derived_extractor_info", b"derived_extractor_info"]) -> typing_extensions.Literal["field_info"] | None: ...
 
 global___Extractor = Extractor
diff --git a/fennel/lib/schema/schema.py b/fennel/lib/schema/schema.py
index 0dd95a14c..3c10d7179 100644
--- a/fennel/lib/schema/schema.py
+++ b/fennel/lib/schema/schema.py
@@ -46,9 +46,13 @@ def _is_optional(field):
     return _get_origin(field) is Union and type(None) in _get_args(field)
 
 
+def _optional_inner(type_):
+    return _get_args(type_)[0]
+
+
 def dtype_to_string(type_: Any) -> str:
     if _is_optional(type_):
-        return f"Optional[{dtype_to_string(_get_args(type_)[0])}]"
+        return f"Optional[{dtype_to_string(_optional_inner(type_))}]"
     if isinstance(type_, type):
         return type_.__name__
     return str(type_)
diff --git a/fennel/lib/to_proto/serializer.py b/fennel/lib/to_proto/serializer.py
index d739f2ab3..356674ec1 100644
--- a/fennel/lib/to_proto/serializer.py
+++ b/fennel/lib/to_proto/serializer.py
@@ -259,6 +259,17 @@ def visitDrop(self, obj):
             ),
         )
 
+    def visitDropNull(self, obj):
+        return proto.Operator(
+            id=obj.signature(),
+            is_root=(obj == self.terminal_node),
+            pipeline_name=self.pipeline_name,
+            dataset_name=self.dataset_name,
+            dropnull=proto.Dropnull(
+                operand_id=self.visit(obj.node), columns=obj.columns
+            ),
+        )
+
     def visitRename(self, obj):
         return proto.Operator(
             id=obj.signature(),
diff --git a/fennel/test_lib/executor.py b/fennel/test_lib/executor.py
index 525e0e905..1bddf5836 100644
--- a/fennel/test_lib/executor.py
+++ b/fennel/test_lib/executor.py
@@ -379,11 +379,17 @@ def visitDrop(self, obj):
 
         return NodeRet(df, input_ret.timestamp_field, input_ret.key_fields)
 
-    def visitAssign(self, obj):
+    def visitDropNull(self, obj):
         input_ret = self.visit(obj.node)
         if input_ret is None:
             return None
         df = input_ret.df
+        df = df.dropna(subset=obj.columns)
+        return NodeRet(df, input_ret.timestamp_field, input_ret.key_fields)
+
+    def visitAssign(self, obj):
+        input_ret = self.visit(obj.node)
+        df = input_ret.df
         df[obj.column] = obj.func(df)
         return NodeRet(df, input_ret.timestamp_field, input_ret.key_fields)
 
diff --git a/fennel/utils.py b/fennel/utils.py
index 265aec3ee..072b2a4a2 100644
--- a/fennel/utils.py
+++ b/fennel/utils.py
@@ -10,10 +10,11 @@
 import textwrap
 
 from pandas import DataFrame
-from typing import Any
+from typing import Any, Type
 from typing import cast, Callable, Dict, List, Tuple, Union
 
-import fennel._vendor.astunparse as astunparse  # type: ignore
+import fennel._vendor.astunparse as astunparse
+from fennel._vendor.pydantic.typing import get_args, get_origin  # type: ignore
 import fennel._vendor.requests as requests  # type: ignore
 
 Tags = Union[List[str], Tuple[str, ...], str]
diff --git a/pyproject.toml b/pyproject.toml
index a80b5f76f..cc6613fe4 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
 [tool.poetry]
 name = "fennel-ai"
-version = "0.18.7"
+version = "0.18.8"
 description = "The modern realtime feature engineering platform"
 authors = ["Fennel AI "]
 packages = [{ include = "fennel" }]