Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Shepard Optimized Interpolation - Multiple Inputs Support #515

Merged
merged 11 commits into from
Jan 25, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ straightforward as possible.

### Added

- ENH: Shepard Optimized Interpolation - Multiple Inputs Support [#515](https://github.com/RocketPy-Team/RocketPy/pull/515)
- ENH: Argument for Optional Mutation on Function Discretize [#519](https://github.com/RocketPy-Team/RocketPy/pull/519)

### Changed
Expand Down
83 changes: 44 additions & 39 deletions rocketpy/mathutils/function.py
phmbressan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -483,25 +483,9 @@ def get_value_opt(x):
return y

elif self.__interpolation__ == "shepard":
x_data = self.source[:, 0:-1] # Support for N-Dimensions
y_data = self.source[:, -1]
len_y_data = len(y_data) # A little speed up

# change the function's name to avoid mypy's error
def get_value_opt_multiple(*args):
x = np.array([[float(val) for val in args]])
sub_matrix = x_data - x
distances_squared = np.sum(sub_matrix**2, axis=1)

zero_distance_index = np.where(distances_squared == 0)[0]
if len(zero_distance_index) > 0:
return y_data[zero_distance_index[0]]

weights = distances_squared ** (-1.5)
numerator_sum = np.sum(y_data * weights)
denominator_sum = np.sum(weights)

return numerator_sum / denominator_sum
return self.__interpolate_shepard__(args)

get_value_opt = get_value_opt_multiple

Expand Down Expand Up @@ -903,28 +887,8 @@ def get_value(self, *args):

# Returns value for shepard interpolation
elif self.__interpolation__ == "shepard":
if all(isinstance(arg, Iterable) for arg in args):
x = list(np.column_stack(args))
else:
x = [[float(x) for x in list(args)]]
ans = x
x_data = self.source[:, 0:-1]
y_data = self.source[:, -1]
for i, _ in enumerate(x):
numerator_sum = 0
denominator_sum = 0
for o, _ in enumerate(y_data):
sub = x_data[o] - x[i]
distance = (sub.dot(sub)) ** (0.5)
if distance == 0:
numerator_sum = y_data[o]
denominator_sum = 1
break
weight = distance ** (-3)
numerator_sum = numerator_sum + y_data[o] * weight
denominator_sum = denominator_sum + weight
ans[i] = numerator_sum / denominator_sum
return ans if len(ans) > 1 else ans[0]
return self.__interpolate_shepard__(args)

# Returns value for polynomial interpolation function type
elif self.__interpolation__ == "polynomial":
if isinstance(args[0], (int, float)):
Expand Down Expand Up @@ -1682,6 +1646,47 @@ def __interpolate_akima__(self):
coeffs[4 * i : 4 * i + 4] = np.linalg.solve(matrix, result)
self.__akima_coefficients__ = coeffs

def __interpolate_shepard__(self, args):
"""Calculates the shepard interpolation from the given arguments.
The shepard interpolation is computed by a inverse distance weighting
in a vectorized manner.

Parameters
----------
args : scalar, list
Values where the Function is to be evaluated.

Returns
-------
result : scalar, list
The result of the interpolation.
"""
x_data = self.source[:, 0:-1] # Support for N-Dimensions
y_data = self.source[:, -1]

arg_stack = np.column_stack(args)
arg_qty, arg_dim = arg_stack.shape
result = np.zeros(arg_qty)

# Reshape to vectorize calculations
x = arg_stack.reshape(arg_qty, 1, arg_dim)

sub_matrix = x_data - x
distances_squared = np.sum(sub_matrix**2, axis=2)

# Remove zero distances from further calculations
zero_distances = np.where(distances_squared == 0)
valid_indexes = np.ones(arg_qty, dtype=bool)
valid_indexes[zero_distances[0]] = False

weights = distances_squared[valid_indexes] ** (-1.5)
numerator_sum = np.sum(y_data * weights, axis=1)
denominator_sum = np.sum(weights, axis=1)
result[valid_indexes] = numerator_sum / denominator_sum
result[~valid_indexes] = y_data[zero_distances[1]]

return result if len(result) > 1 else result[0]

def __neg__(self):
"""Negates the Function object. The result has the same effect as
multiplying the Function by -1.
Expand Down
49 changes: 49 additions & 0 deletions tests/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,55 @@ def test_multivariable_dataset(a, b):
assert np.isclose(func(a, b), a + b, atol=1e-6)


@pytest.mark.parametrize(
"x,y,z_expected",
[
(1, 0, 0),
(0, 1, 0),
(0, 0, 1),
(0.5, 0.5, 1 / 3),
(0.25, 0.25, 25 / (25 + 2 * 5**0.5)),
([0, 0.5], [0, 0.5], [1, 1 / 3]),
],
)
def test_2d_shepard_interpolation(x, y, z_expected):
"""Test the shepard interpolation method of the Function class."""
# Test plane x + y + z = 1
source = [(1, 0, 0), (0, 1, 0), (0, 0, 1)]
func = Function(
source=source, inputs=["x", "y"], outputs=["z"], interpolation="shepard"
)
z = func(x, y)
z_opt = func.get_value_opt(x, y)
assert np.isclose(z, z_opt, atol=1e-8).all()
assert np.isclose(z_expected, z, atol=1e-8).all()


@pytest.mark.parametrize(
"x,y,z,w_expected",
[
(0, 0, 0, 1),
(1, 0, 0, 0),
(0, 1, 0, 0),
(0, 0, 1, 0),
(0.5, 0.5, 0.5, 1 / 4),
(0.25, 0.25, 0.25, 0.700632626832),
([0, 0.5], [0, 0.5], [0, 0.5], [1, 1 / 4]),
],
)
def test_3d_shepard_interpolation(x, y, z, w_expected):
"""Test the shepard interpolation method of the Function class."""
# Test plane x + y + z + w = 1
source = [(1, 0, 0, 0), (0, 1, 0, 0), (0, 0, 1, 0), (0, 0, 0, 1)]
func = Function(
source=source, inputs=["x", "y", "z"], outputs=["w"], interpolation="shepard"
)
w = func(x, y, z)
w_opt = func.get_value_opt(x, y, z)
assert np.isclose(w, w_opt, atol=1e-8).all()
assert np.isclose(w_expected, w, atol=1e-8).all()


@pytest.mark.parametrize("a", [-1, -0.5, 0, 0.5, 1])
@pytest.mark.parametrize("b", [-1, -0.5, 0, 0.5, 1])
def test_multivariable_function(a, b):
Expand Down
Loading