Skip to content

Commit

Permalink
Merge pull request #446 from GispoCoding/445-fixmodify-pca-tools
Browse files Browse the repository at this point in the history
Fix and update PCA tools
  • Loading branch information
nmaarnio authored Oct 28, 2024
2 parents dfd3bbd + 82db606 commit ccc2dad
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 65 deletions.
52 changes: 33 additions & 19 deletions eis_toolkit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ def parallel_coordinates_cli(
def compute_pca_raster_cli(
input_rasters: INPUT_FILES_ARGUMENT,
output_raster: OUTPUT_FILE_OPTION,
number_of_components: int = typer.Option(),
number_of_components: Optional[int] = None,
# NOTE: Omitted scaler type selection here since the parameter might be deleted from PCA func
nodata_handling: Annotated[NodataHandling, typer.Option(case_sensitive=False)] = NodataHandling.remove,
# NOTE: Omitted nodata parameter. Should use raster nodata.
Expand All @@ -715,27 +715,34 @@ def compute_pca_raster_cli(
stacked_array, profiles = read_and_stack_rasters(input_rasters, nodata_handling="convert_to_nan")
typer.echo("Progress: 25%")

pca_array, variance_ratios = compute_pca(
transformed_data, principal_components, variances, variance_ratios = compute_pca(
data=stacked_array, number_of_components=number_of_components, nodata_handling=get_enum_values(nodata_handling)
)

# Fill np.nan with nodata before writing data to raster
pca_array[pca_array == np.nan] = -9999
transformed_data[transformed_data == np.nan] = -9999
out_profile = profiles[0]
out_profile["nodata"] = -9999

# Update nr of bands
out_profile["count"] = number_of_components
out_profile["count"] = len(variances)

# Create dictionary from the variance ratios array
variances_ratios_dict = {}
for i, variance_ratio in enumerate(variance_ratios):
name = "PC " + str(i) + " explained variance"
variances_ratios_dict[name] = variance_ratio
json_str = json.dumps(variances_ratios_dict)
# variances_ratios_dict = {}
# for i, variance_ratio in enumerate(variance_ratios):
# name = "PC " + str(i) + " explained variance"
# variances_ratios_dict[name] = variance_ratio
# json_str = json.dumps(variances_ratios_dict)

out_dict = {
"principal_components": np.round(principal_components, 4).tolist(),
"explained_variances": np.round(variances, 4).tolist(),
"explained_variance_ratios": np.round(variance_ratios, 4).tolist(),
}
json_str = json.dumps(out_dict)

with rasterio.open(output_raster, "w", **out_profile) as dst:
dst.write(pca_array)
dst.write(transformed_data)

typer.echo("Progress: 100%")

Expand All @@ -748,7 +755,7 @@ def compute_pca_raster_cli(
def compute_pca_vector_cli(
input_vector: INPUT_FILE_OPTION,
output_vector: OUTPUT_FILE_OPTION,
number_of_components: int = typer.Option(),
number_of_components: Optional[int] = None,
columns: Annotated[List[str], typer.Option()] = None,
# NOTE: Omitted scaler type selection here since the parameter might be deleted from PCA func
nodata_handling: Annotated[NodataHandling, typer.Option(case_sensitive=False)] = NodataHandling.remove,
Expand All @@ -762,7 +769,7 @@ def compute_pca_vector_cli(
gdf = gpd.read_file(input_vector)
typer.echo("Progress: 25%")

pca_gdf, variance_ratios = compute_pca(
transformed_data, principal_components, variances, variance_ratios = compute_pca(
data=gdf,
number_of_components=number_of_components,
columns=columns,
Expand All @@ -771,13 +778,20 @@ def compute_pca_vector_cli(
)

# Create dictionary from the variance ratios array
variances_ratios_dict = {}
for i, variance_ratio in enumerate(variance_ratios):
name = "PC " + str(i) + " explained variance"
variances_ratios_dict[name] = variance_ratio
json_str = json.dumps(variances_ratios_dict)

pca_gdf.to_file(output_vector)
# variances_ratios_dict = {}
# for i, variance_ratio in enumerate(variance_ratios):
# name = "PC " + str(i) + " explained variance"
# variances_ratios_dict[name] = variance_ratio
# json_str = json.dumps(variances_ratios_dict)

out_dict = {
"principal_components": np.round(principal_components, 4).tolist(),
"explained_variances": np.round(variances, 4).tolist(),
"explained_variance_ratios": np.round(variance_ratios, 4).tolist(),
}
json_str = json.dumps(out_dict)

transformed_data.to_file(output_vector)
typer.echo("Progress: 100%")

typer.echo(f"Results: {json_str}")
Expand Down
65 changes: 39 additions & 26 deletions eis_toolkit/exploratory_analyses/pca.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,30 @@ def _handle_missing_values(
@beartype
def _compute_pca(
feature_matrix: np.ndarray, number_of_components: int, scaler_type: str
) -> Tuple[np.ndarray, np.ndarray]:
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
scaler = SCALERS[scaler_type]()
scaled_data = scaler.fit_transform(feature_matrix)

pca = PCA(n_components=number_of_components)
principal_components = pca.fit_transform(scaled_data)
explained_variances = pca.explained_variance_ratio_
transformed_data = pca.fit_transform(scaled_data)
principal_components = pca.components_
explained_variances = pca.explained_variance_
explained_variance_ratios = pca.explained_variance_ratio_

return principal_components, explained_variances
return transformed_data, principal_components, explained_variances, explained_variance_ratios


@beartype
def compute_pca(
data: Union[np.ndarray, pd.DataFrame, gpd.GeoDataFrame],
number_of_components: int,
number_of_components: Optional[int] = None,
columns: Optional[Sequence[str]] = None,
scaler_type: Literal["standard", "min_max", "robust"] = "standard",
nodata_handling: Literal["remove", "replace"] = "remove",
nodata: Optional[Number] = None,
) -> Tuple[Union[np.ndarray, pd.DataFrame, gpd.GeoDataFrame], np.ndarray]:
) -> Tuple[Union[np.ndarray, pd.DataFrame, gpd.GeoDataFrame], np.ndarray, np.ndarray, np.ndarray]:
"""
Compute defined number of principal components for numeric input data.
Compute defined number of principal components for numeric input data and transform the data.
Before computation, data is scaled according to specified scaler and NaN values removed or replaced.
Optionally, a nodata value can be given to handle similarly as NaN values.
Expand All @@ -93,7 +95,8 @@ def compute_pca(
Args:
data: Input data for PCA.
number_of_components: The number of principal components to compute. Should be >= 1 and at most
the number of numeric columns if input is (Geo)Dataframe.
the number of features found in input data. If not defined, will be the same as number of
features in data. Defaults to None.
columns: Select columns used for the PCA. Other columns are excluded from PCA, but added back
to the result Dataframe intact. Only relevant if input is (Geo)Dataframe. Defaults to None.
scaler_type: Transform data according to a specified Sklearn scaler.
Expand All @@ -103,8 +106,8 @@ def compute_pca(
nodata: Define a nodata value to remove. Defaults to None.
Returns:
The computed principal components in corresponding format as the input data and the
explained variance ratios for each component.
The transformed data in same format as input data, computed principal components, explained variances
and explained variance ratios for each component.
Raises:
EmptyDataException: The input is empty.
Expand All @@ -116,7 +119,7 @@ def compute_pca(
if scaler_type not in SCALERS:
raise InvalidParameterValueException(f"Invalid scaler. Choose from: {list(SCALERS.keys())}")

if number_of_components < 1:
if number_of_components is not None and number_of_components < 1:
raise InvalidParameterValueException("The number of principal components should be >= 1.")

# Get feature matrix (Numpy array) from various input types
Expand Down Expand Up @@ -158,40 +161,50 @@ def compute_pca(
feature_matrix = feature_matrix.astype(float)
feature_matrix, nan_mask = _handle_missing_values(feature_matrix, nodata_handling, nodata)

# Default number of components to number of features in data if not defined
if number_of_components is None:
number_of_components = feature_matrix.shape[1]

if number_of_components > feature_matrix.shape[1]:
raise InvalidParameterValueException("The number of principal components is too high for the given input data.")
raise InvalidParameterValueException(
"The number of principal components is too high for the given input data "
+ f"({number_of_components} > {feature_matrix.shape[1]})."
)

# Core PCA computation
principal_components, explained_variances = _compute_pca(feature_matrix, number_of_components, scaler_type)
transformed_data, principal_components, explained_variances, explained_variance_ratios = _compute_pca(
feature_matrix, number_of_components, scaler_type
)

if nodata_handling == "remove" and nan_mask is not None:
principal_components_with_nans = np.full((nan_mask.size, principal_components.shape[1]), np.nan)
principal_components_with_nans[~nan_mask, :] = principal_components
principal_components = principal_components_with_nans
transformed_data_with_nans = np.full((nan_mask.size, transformed_data.shape[1]), np.nan)
transformed_data_with_nans[~nan_mask, :] = transformed_data
transformed_data = transformed_data_with_nans

# Convert PCA output to proper format
if isinstance(data, np.ndarray):
if data.ndim == 3:
result_data = principal_components.reshape(rows, cols, -1).transpose(2, 0, 1)
transformed_data_out = transformed_data.reshape(rows, cols, -1).transpose(2, 0, 1)
else:
result_data = principal_components
transformed_data_out = transformed_data

elif isinstance(data, pd.DataFrame):
component_names = [f"principal_component_{i+1}" for i in range(number_of_components)]
result_data = pd.DataFrame(data=principal_components, columns=component_names)
transformed_data_out = pd.DataFrame(data=transformed_data, columns=component_names)
if columns is not None:
old_columns = [column for column in data.columns if column not in columns]
for column in old_columns:
result_data[column] = data[column]
transformed_data_out[column] = data[column]
if isinstance(data, gpd.GeoDataFrame):
result_data = gpd.GeoDataFrame(result_data, geometry=geometries, crs=crs)
transformed_data_out = gpd.GeoDataFrame(transformed_data_out, geometry=geometries, crs=crs)

return result_data, explained_variances
return transformed_data_out, principal_components, explained_variances, explained_variance_ratios


@beartype
def plot_pca(
pca_df: pd.DataFrame,
explained_variances: Optional[np.ndarray] = None,
explained_variance_ratios: Optional[np.ndarray] = None,
color_column_name: Optional[str] = None,
save_path: Optional[str] = None,
) -> sns.PairGrid:
Expand All @@ -203,7 +216,7 @@ def plot_pca(
Args:
pca_df: A DataFrame containing computed principal components.
explained_variances: The explained variance ratios for each principal component. Used for labeling
explained_variance_ratios: The explained variance ratios for each principal component. Used for labeling
axes in the plot. Optional parameter. Defaults to None.
color_column_name: Name of the column that will be used for color-coding data points. Typically a
categorical variable in the original data. Optional parameter, no colors if not provided.
Expand All @@ -226,8 +239,8 @@ def plot_pca(
pair_grid = sns.pairplot(filtered_df, hue=color_column_name)

# Add explained variances to axis labels if provided
if explained_variances is not None:
labels = [f"PC {i+1} ({var:.1f}%)" for i, var in enumerate(explained_variances * 100)]
if explained_variance_ratios is not None:
labels = [f"PC {i+1} ({var:.1f}%)" for i, var in enumerate(explained_variance_ratios * 100)]
else:
labels = [f"PC {i+1}" for i in range(len(pair_grid.axes))]

Expand Down
Loading

0 comments on commit ccc2dad

Please sign in to comment.