Skip to content

Commit

Permalink
Merge pull request #81 from azaurus1/adds-filterconfig-to-table-resource
Browse files Browse the repository at this point in the history
Adds filterconfig to table resource
  • Loading branch information
azaurus1 authored Apr 12, 2024
2 parents 153cc24 + 0c83ca8 commit 066a239
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 40 deletions.
9 changes: 9 additions & 0 deletions docs/resources/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,20 @@ Optional:
Optional:

- `continue_on_error` (Boolean) continue after error ingesting.
- `filter_config` (Attributes) filter configuration (see [below for nested schema](#nestedatt--ingestion_config--filter_config))
- `row_time_value_check` (Boolean) row time value check.
- `segment_time_value_check` (Boolean) segment time value check.
- `stream_ingestion_config` (Attributes) stream ingestion configurations (see [below for nested schema](#nestedatt--ingestion_config--stream_ingestion_config))
- `transform_configs` (Attributes List) transform configurations (see [below for nested schema](#nestedatt--ingestion_config--transform_configs))

<a id="nestedatt--ingestion_config--filter_config"></a>
### Nested Schema for `ingestion_config.filter_config`

Optional:

- `filter_function` (String) filter function


<a id="nestedatt--ingestion_config--stream_ingestion_config"></a>
### Nested Schema for `ingestion_config.stream_ingestion_config`

Expand Down
6 changes: 6 additions & 0 deletions examples/tables/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ locals {
join("_", [for keyName in regexall("[A-Z]?[a-z]+", key) : lower(keyName)]) => value
}

filter_config = {
for key, value in local.ingestion_config["filter_config"] :
join("_", [for keyName in regexall("[A-Z]?[a-z]+", key) : lower(keyName)]) => value
}

stream_ingestion_config = {
for key, value in local.ingestion_config["stream_ingestion_config"] :
join("_", [for keyName in regexall("[A-Z]?[a-z]+", key) : lower(keyName)]) => value
Expand Down Expand Up @@ -147,6 +152,7 @@ resource "pinot_table" "realtime_table" {
row_time_value_check = true
stream_ingestion_config = local.parsed_stream_ingestion_config
transform_configs = local.transform_configs
filter_config = local.filter_config
})


Expand Down
5 changes: 4 additions & 1 deletion examples/tables/realtime_table_example.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@
"columnName": "hash_json",
"transformFunction": "json_format(hash)"
}
]
],
"filterConfig": {
"filterFunction": "mod(number, 10) >=5"
}
},
"tierConfigs": [
{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module terraform-provider-pinot
go 1.22.0

require (
github.com/azaurus1/go-pinot-api v0.4.1
github.com/azaurus1/go-pinot-api v0.4.2
github.com/azaurus1/pinot-testContainer v0.0.0-20240403033323-8a28c4c0636d
github.com/hashicorp/terraform-plugin-docs v0.18.0
github.com/hashicorp/terraform-plugin-framework v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/azaurus1/go-pinot-api v0.4.1 h1:4XnOs+0qPceS0ObvIK+HrlQdoWL4xttbwH1ZW8hjJo8=
github.com/azaurus1/go-pinot-api v0.4.1/go.mod h1:Zxi9gp5nAlU95XH1FQlAXLzH/cLGdX6KaisGtVF56t0=
github.com/azaurus1/go-pinot-api v0.4.2 h1:VrGsXi4i0kAokLacmu8IV18zokGVhuoQOq2xY/SIP6Q=
github.com/azaurus1/go-pinot-api v0.4.2/go.mod h1:Zxi9gp5nAlU95XH1FQlAXLzH/cLGdX6KaisGtVF56t0=
github.com/azaurus1/pinot-testContainer v0.0.0-20240403033323-8a28c4c0636d h1:kHkQOHQk8+uHTad97uBhNhg0vEmYPhqeFUqoHxGescU=
github.com/azaurus1/pinot-testContainer v0.0.0-20240403033323-8a28c4c0636d/go.mod h1:SG7Smj9VxxeB5xy90nHOjAqUVS1D4cWMOwgph2NPQqg=
github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
Expand Down
97 changes: 59 additions & 38 deletions internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,49 @@ func SetStateFromTable(ctx context.Context, state *models.TableResourceModel, ta

state.TableName = types.StringValue(table.TableName)
state.TableType = types.StringValue(table.TableType)
state.IsDimTable = types.BoolValue(table.IsDimTable)

state.TenantsConfig = &models.TenantsConfig{
Broker: types.StringValue(table.Tenants.Broker),
Server: types.StringValue(table.Tenants.Server),
}

state.TenantsConfig = convertTenantConfig(table)
state.SegmentsConfig = convertSegmentsConfig(table)
state.IngestionConfig = convertIngestionConfig(table)
state.TierConfigs = convertTierConfigs(table)
state.TierConfigs = convertTierConfigs(table)
state.Metadata = convertMetadata(table)

tableIndexConfig, resultDiags := convertTableIndexConfig(ctx, table)
if resultDiags.HasError() {
diags.Append(resultDiags...)
}
state.TableIndexConfig = tableIndexConfig

var ingestionTransformConfigs []*models.TransformConfig
for _, transformConfig := range table.IngestionConfig.TransformConfigs {
ingestionTransformConfigs = append(ingestionTransformConfigs, &models.TransformConfig{
ColumnName: types.StringValue(transformConfig.ColumnName),
TransformFunction: types.StringValue(transformConfig.TransformFunction),
})
// Routing Config
if table.Routing != nil {
routingConfig, routingDiags := convertRoutingConfig(ctx, table)
if routingDiags.HasError() {
diags.Append(routingDiags...)
}
state.Routing = routingConfig
}

state.IngestionConfig = &models.IngestionConfig{
SegmentTimeValueCheck: types.BoolValue(table.IngestionConfig.SegmentTimeValueCheck),
RowTimeValueCheck: types.BoolValue(table.IngestionConfig.RowTimeValueCheck),
ContinueOnError: types.BoolValue(table.IngestionConfig.ContinueOnError),
StreamIngestionConfig: &models.StreamIngestionConfig{
StreamConfigMaps: table.IngestionConfig.StreamIngestionConfig.StreamConfigMaps,
},
TransformConfigs: ingestionTransformConfigs,
// Upsert Config
if table.UpsertConfig != nil {
upsertConfig, upsertDiags := convertUpsertConfig(ctx, table)
if upsertDiags.HasError() {
diags.Append(upsertDiags...)
}
state.UpsertConfig = upsertConfig
}

return diags
}

func convertMetadata(table *model.Table) *models.Metadata {
return &models.Metadata{
CustomConfigs: table.Metadata.CustomConfigs,
}
}

func convertTierConfigs(table *model.Table) []*models.TierConfig {
var tierConfigs []*models.TierConfig
for _, tierConfig := range table.TierConfigs {
tierConfigs = append(tierConfigs, &models.TierConfig{
Expand All @@ -58,33 +69,43 @@ func SetStateFromTable(ctx context.Context, state *models.TableResourceModel, ta
})
}

state.TierConfigs = tierConfigs

state.IsDimTable = types.BoolValue(table.IsDimTable)
return tierConfigs
}

state.Metadata = &models.Metadata{
CustomConfigs: table.Metadata.CustomConfigs,
func convertTenantConfig(table *model.Table) *models.TenantsConfig {
return &models.TenantsConfig{
Broker: types.StringValue(table.Tenants.Broker),
Server: types.StringValue(table.Tenants.Server),
}
}

// Routing Config
if table.Routing != nil {
routingConfig, routingDiags := convertRoutingConfig(ctx, table)
if routingDiags.HasError() {
diags.Append(routingDiags...)
}
state.Routing = routingConfig
func convertIngestionConfig(table *model.Table) *models.IngestionConfig {

var transformConfigs []*models.TransformConfig
for _, transformConfig := range table.IngestionConfig.TransformConfigs {
transformConfigs = append(transformConfigs, &models.TransformConfig{
ColumnName: types.StringValue(transformConfig.ColumnName),
TransformFunction: types.StringValue(transformConfig.TransformFunction),
})
}

// Upsert Config
if table.UpsertConfig != nil {
upsertConfig, upsertDiags := convertUpsertConfig(ctx, table)
if upsertDiags.HasError() {
diags.Append(upsertDiags...)
var filterConfig *models.FilterConfig
if table.IngestionConfig.FilterConfig != nil {
filterConfig = &models.FilterConfig{
FilterFunction: types.StringValue(table.IngestionConfig.FilterConfig.FilterFunction),
}
state.UpsertConfig = upsertConfig
}

return diags
return &models.IngestionConfig{
SegmentTimeValueCheck: types.BoolValue(table.IngestionConfig.SegmentTimeValueCheck),
RowTimeValueCheck: types.BoolValue(table.IngestionConfig.RowTimeValueCheck),
ContinueOnError: types.BoolValue(table.IngestionConfig.ContinueOnError),
StreamIngestionConfig: &models.StreamIngestionConfig{
StreamConfigMaps: table.IngestionConfig.StreamIngestionConfig.StreamConfigMaps,
},
TransformConfigs: transformConfigs,
FilterConfig: filterConfig,
}
}

func convertSegmentPartitionConfig(table *model.Table) *models.SegmentPartitionConfig {
Expand Down
6 changes: 6 additions & 0 deletions internal/converter/override.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ func ToIngestionConfig(plan *models.TableResourceModel) *model.TableIngestionCon
ingestionConfig.TransformConfigs = transformConfigs
}

if plan.IngestionConfig.FilterConfig != nil {
ingestionConfig.FilterConfig = &model.FilterConfig{
FilterFunction: plan.IngestionConfig.FilterConfig.FilterFunction.ValueString(),
}
}

return &ingestionConfig
}

Expand Down
5 changes: 5 additions & 0 deletions internal/models/pinot.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,17 @@ type TransformConfig struct {
TransformFunction types.String `tfsdk:"transform_function"`
}

type FilterConfig struct {
FilterFunction types.String `tfsdk:"filter_function"`
}

type IngestionConfig struct {
SegmentTimeValueCheck types.Bool `tfsdk:"segment_time_value_check"`
RowTimeValueCheck types.Bool `tfsdk:"row_time_value_check"`
ContinueOnError types.Bool `tfsdk:"continue_on_error"`
StreamIngestionConfig *StreamIngestionConfig `tfsdk:"stream_ingestion_config"`
TransformConfigs []*TransformConfig `tfsdk:"transform_configs"`
FilterConfig *FilterConfig `tfsdk:"filter_config"`
}

type StreamIngestionConfig struct {
Expand Down
10 changes: 10 additions & 0 deletions internal/tf_schema/tables_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ func IngestionConfig() schema.SingleNestedAttribute {
},
},
},
"filter_config": schema.SingleNestedAttribute{
Description: "filter configuration",
Optional: true,
Attributes: map[string]schema.Attribute{
"filter_function": schema.StringAttribute{
Description: "filter function",
Optional: true,
},
},
},
},
}
}
Expand Down

0 comments on commit 066a239

Please sign in to comment.