Skip to content

Commit

Permalink
Add FEniCS and Dolfin-adjoint based code for solving optimal transpor…
Browse files Browse the repository at this point in the history
…t problems.
  • Loading branch information
meg-simula committed Feb 6, 2024
0 parents commit 8007bb2
Show file tree
Hide file tree
Showing 5 changed files with 611 additions and 0 deletions.
16 changes: 16 additions & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Dependencies:
# Python3
# fenicsproject (stable or dev)
# numpy

# fenicsproject run stable
# fenicsproject run dev
fenicsproject run quay.io/dolfinadjoint/pyadjoint:latest

# Module files
cases.py
optimalflow.py

# Run test script
run_case.py

212 changes: 212 additions & 0 deletions cases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import os
import os.path
import csv
import numpy

from dolfin import *
from optimalflow import non_negativize

def hdf5write(name, field):
mesh = field.function_space().mesh()
file = HDF5File(mesh.mpi_comm(), name, "w")
file.write(field, "/function", 0)
file.close()

def csvwrite(name, values, header=None, debug=True, mode="w"):
if debug:
print(header)
print(" & ".join("$%.4f$" % v for v in values))

with open(name, mode) as f:
writer = csv.writer(f)
writer.writerow(header)
writer.writerow(values)

def generate_data_bump(mesh, g, D=0.0, r=0.0, noise=0.0, output=None):
"""Generate data for a relatively smooth test case with a bump moving
in the middle of the domain.
Domain: [0, 1]**2
c_0: Bump function
phi = (g, g)
Let c_1 be defined by one timestep of the convection-diffusion-reaction equation
c_t + div (phi c) - D div grad c + r c = 0
where c_0 and phi are as given, and D and r if given. Evolve by a
timestep tau = 1.0.
If outputdir is given as a string, write generated data to given
directory.
"""
#tau = Constant(1.0) # Time step
tau = 1.0 # Time step

Q = FunctionSpace(mesh, "CG", 1)

# Define initial c0
bump_x = "beta/(2*alpha*tgamma(1./beta))*std::exp(-pow((std::abs(x[0] - mu)/alpha), beta))"
bump_y = "beta/(2*alpha*tgamma(1./beta))*std::exp(-pow((std::abs(x[1] - mu)/alpha), beta))"
bump = Expression("1 + 1./7*%s*%s" % (bump_x, bump_y), degree=3, beta=8.0, alpha=0.2, mu=0.5)
c0 = interpolate(bump, Q)

# Define initial phi
V = VectorFunctionSpace(mesh, "CG", 1)
phi = Expression((str(g), str(g)), degree=1)
phi = interpolate(phi, V)

# Define variational problem for evolution of c
c = TrialFunction(Q)
d = TestFunction(Q)
a = (inner(c, d) + tau*inner(div(c*phi), d))*dx()
if D is not None and abs(D) > 0.0:
a += tau*inner(D*grad(c), grad(d))*dx()
if r is not None and abs(r) > 0.0:
a += tau*r*c*d*dx()

L = inner(c0, d)*dx()
A = assemble(a)
b = assemble(L)

if D is not None and D > 0:
bc = DirichletBC(Q, c0, "on_boundary")
bc.apply(A, b)

# Solve it
c1 = Function(Q)
solve(A, c1.vector(), b, "mumps")

# Add noise to the concentrations:
if noise:
N = c0.vector().size()
r0 = numpy.random.normal(0, noise, N)
r1 = numpy.random.normal(0, noise, N)
c0.vector()[:] += r0
c1.vector()[:] += r1

# Zero any negative values
c0 = non_negativize(c0)
c1 = non_negativize(c1)

# Optional: Store solutions to .h5 and .pvd format
if output is not None:
filename = lambda f: os.path.join(output, "data", f)
hdf5write(filename("c0_e.h5"), c0)
hdf5write(filename("c1_e.h5"), c1)
hdf5write(filename("phi_e.h5"), phi)

file = File(filename("c_e.pvd"))
file << c0
file << c1
file = File(filename("phi_e.pvd"))
file << phi

csvwrite(filename("info_e.csv"), (float(tau), D, g), header=("tau", "D", "g"))

return (c0, c1, phi, tau)

def generate_data_bdry(mesh, g, D=0.0, r=0.0, noise=0.0, tau=1.0, output=None):
"""Generate data for a test case with a concentration appearing on the boundary.
Domain: Given
c_0: 1.0 on the left and lower boundary if d = 2, on_boundary iff d=3
phi = (g, 0) if d == 2, (g, 0, 0) iff d == 3
Let c_1 be defined by one timestep of the convection-diffusion-reaction equation
c_t + div (phi c) - D div grad c + r c = 0
where c_0 and phi are as given, and D and r if given. Evolve by a
timestep tau = 1.0.
If outputdir is given as a string, write generated data to given
directory.
"""
tau = Constant(tau) # Time step

Q = FunctionSpace(mesh, "CG", 1)
dim = mesh.topology().dim()
if dim == 2:
bc = DirichletBC(Q, 1.0, "near(x[0], 0.0) || near(x[1], 0.0)", "pointwise")
else:
bc = DirichletBC(Q, 1.0, "on_boundary")
c_ = Function(Q)
bc.apply(c_.vector())

# Define initial phi
V = VectorFunctionSpace(mesh, "CG", 1)
if dim == 2:
phi = Expression((str(g), "0.0"), degree=1)
else:
phi = Expression((str(g), "0.0", "0.0"), degree=1)
phi = interpolate(phi, V)

# Define variational problem for evolution of c
c = TrialFunction(Q)
d = TestFunction(Q)
a = (inner(c, d) + tau*inner(div(c*phi), d))*dx()
if D is not None and abs(D) > 0.0:
a += tau*inner(D*grad(c), grad(d))*dx()
if r is not None and abs(r) > 0.0:
a += tau*r*c*d*dx()

L = inner(c_, d)*dx()
A = assemble(a)
b = assemble(L)

if D is not None and D > 0:
bc = DirichletBC(Q, c_, "on_boundary")
bc.apply(A, b)

# Solve it for c0
c0 = Function(Q)
solve(A, c0.vector(), b, "mumps")

# Evolve further to c1
c_.assign(c0)
b = assemble(L)
if D is not None and D > 0:
bc = DirichletBC(Q, c0, "on_boundary")
bc.apply(A, b)

c1 = Function(Q)
solve(A, c1.vector(), b, "mumps")

# Add noise to the concentrations:
if noise:
N = c0.vector().size()
r0 = numpy.random.normal(0, noise, N)
r1 = numpy.random.normal(0, noise, N)
c0.vector()[:] += r0
c1.vector()[:] += r1

# Zero any negative values
c0 = non_negativize(c0)
c1 = non_negativize(c1)

# Optional: Store solutions to .h5 and .pvd format
if output is not None:
filename = lambda f: os.path.join(output, f)
hdf5write(filename("c0_e.h5"), c0)
hdf5write(filename("c1_e.h5"), c1)
delta_c = project(c1 - c0, c0.function_space())
hdf5write(filename("delta_c_e.h5"), delta_c)
hdf5write(filename("phi_e.h5"), phi)

file = File(filename("c_e.pvd"))
file << c0
file << c1
file << delta_c
file = File(filename("phi_e.pvd"))
file << phi

csvwrite(filename("info_e.csv"), (float(tau), g, D, r), header=("tau", "g", "D", "r"))

return (c0, c1, phi, tau)

if __name__ == "__main__":

mesh = UnitSquareMesh(32, 32)
(c0, c1, phi, tau) = generate_data_bump(mesh, 0.02, output="tmp-bump")
(c0, c1, phi, tau) = generate_data_bdry(mesh, 0.02, output="tmp-bdry")
160 changes: 160 additions & 0 deletions optimalflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import numpy
import pylab
from dolfin import *

def non_negativize(I):
"""Given a scalar Function c, return the field with negative values
replaced by zero.
"""
c = I.copy(deepcopy=True)
vec = c.vector().get_local()
# Extract dof indices where the vector values are negative
negs = numpy.where(vec < 0)[0]
# Set these values to zero
c.vector()[negs] = 0.0
return c

def compute_omt(I0, I1, tau, alpha, space="CG", adjust=False, threshold=1.e-3):
"""Given two image intensity fields I0 and I1, a time step tau > 0 and
a regularization parameter alpha > 0, return a vector field phi
such that
I_t + div (phi I) \approx 0 in \Omega
where I(t = 0) = I0 and I(t = tau) = I1.
Mueller, M., Karasev, P., Kolesov, I. & Tannenbaum, A. Optical
flow estimation for flame detection in videos. IEEE Transactions on
image processing, 22, 2786–2797
If adjust is given as True, filter the input values by replacing
negative values by zero,
Moreover, set ident_zeros for those rows for which no values
exceed the given threshold.
"""

# Map negative values to zero in I0 and I1:
if adjust:
info("Adjusting negative input values to zero.")
I0 = non_negativize(I0)
I1 = non_negativize(I1)

# Check balance of mass assumption
m1 = assemble(I0*dx())
m2 = assemble(I1*dx())
info("OMT: Mass preserved (\int I0 == \in I1)? I0 = %.5g, I1 = %0.5g" % (m1, m2))

Q = I0.function_space()
mesh = Q.mesh()
d = mesh.topology().dim()

if space == "RT":
V = FunctionSpace(mesh, "RT", 1)
else:
V = VectorFunctionSpace(mesh, "CG", 1)

# Define dI and barI
dI = (I1 - I0)/tau
barI = 0.5*(I0 + I1)
I = barI

# Define OMT variational forms
phi = TrialFunction(V)
psi = TestFunction(V)
a = (inner(div(I*phi), div(I*psi)) + alpha*inner(I*phi, psi))*dx()
L = - inner(dI, div(I*psi))*dx()

# Assemble linear system
A = assemble(a)
b = assemble(L)

# Set zero (tol) rows to identity as relevant
A.ident_zeros(tol=threshold)

# Solve the linear system
phi = Function(V)
solve(A, phi.vector(), b, "mumps")

return phi

def element_cell(mesh):
"Return the finite element cell."
return triangle if mesh.topology().dim() == 2 else tetrahedron

def compute_ocd(c1, c2, tau, D, alpha):

mesh = c1.function_space().mesh()

cell = element_cell(mesh)
C = FiniteElement("CG", cell, 1) # H1
Y = FiniteElement("CG", cell, 1) # H10
Q = VectorElement("CG", cell, 1) # H1 (H(div) formulation also natural)
Mx = MixedElement([C, Y, Q])
M = FunctionSpace(mesh, Mx)

m = Function(M)
(c, y, phi) = split(m)
(d, z, psi) = TestFunctions(M)

F1 = (c*d + (1./tau*d + div(d*phi))*y + inner(D*grad(d), grad(y)) - c2*d)*dx()
F2 = (div(c*psi)*y + alpha*(inner(phi, psi) + inner(grad(phi), grad(psi))))*dx()
F3 = ((1./tau*c + div(c*phi))*z + inner(D*grad(c), grad(z)) - 1./tau*c1*z)*dx()
F = F1 + F2 + F3

assign(m.sub(0), c2)

#set_log_level(LogLevel.DEBUG)

bcs = [DirichletBC(M.sub(0), c2, "on_boundary"),
DirichletBC(M.sub(1), 0.0, "on_boundary")]

#info(NonlinearVariationalSolver.default_parameters(), True)
ps = {"nonlinear_solver": "snes", "snes_solver": {"linear_solver": "mumps", "maximum_iterations": 50, "relative_tolerance": 1.e-10}}
solve(F == 0, m, bcs, solver_parameters=ps)
(c, y, phi) = m.split(deepcopy=True)

return (c, y, phi)

def compute_magnitude_field(phi):

# Split phi into its components (really split it)
phis = phi.split(deepcopy=True)

# Get the map from vertex index to dof
v2d = [vertex_to_dof_map(phii.function_space()) for phii in phis]
dim = len(v2d)

# Create array of component values at vertex indices so p0[0] is
# value at vertex 0 e.g.
if dim == 2:
p0 = phis[0].vector().get_local(v2d[0])
p1 = phis[1].vector().get_local(v2d[1])
# Take element-wise magnitude
m = numpy.sqrt((numpy.square(p0) + numpy.square(p1)))
else:
p0 = phis[0].vector().get_local(v2d[0])
p1 = phis[1].vector().get_local(v2d[1])
p2 = phis[2].vector().get_local(v2d[2])
# Take element-wise magnitude
m = numpy.sqrt((numpy.square(p0) + numpy.square(p1) + numpy.square(p2)))

# Map array of values at vertices back to values at dofs and
# insert into magnitude field.
M = phis[0].function_space()
magnitude = Function(M)
d2v = dof_to_vertex_map(M)
magnitude.vector()[:] = m[d2v]
return magnitude

if __name__ == "__main__":
mesh = UnitSquareMesh(4, 4)
V = VectorFunctionSpace(mesh, "CG", 1)
phi = Function(V)
phi.vector()[1] = 2.0
phi.vector()[0] = 2.0
a = compute_magnitude_field(phi)
print(a.vector().max())
print(a.vector().size())
Loading

0 comments on commit 8007bb2

Please sign in to comment.