Skip to content

Commit

Permalink
Initial commit before further development of gnss module
Browse files Browse the repository at this point in the history
  • Loading branch information
fedora Cloud User committed Jul 1, 2024
1 parent e21315c commit ffe59fa
Show file tree
Hide file tree
Showing 2 changed files with 331 additions and 1 deletion.
Empty file removed blah.py
Empty file.
332 changes: 331 additions & 1 deletion geodepy/gnss.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"""

from datetime import datetime
import numpy as np
import pandas as pd
from numpy import zeros, delete
from geodepy.angles import DMSAngle
import re
Expand Down Expand Up @@ -99,6 +101,27 @@ def read_sinex_header_line(file):

return header_line

def read_sinex_custom(fp, start_line, end_line):
"""Read custom line range from SINEX.
:param str fp: Path to SINEX file.
:param int start_line: Beginning line number.
:param int end_line: Finishing line number.
:return list custom: List of string(s).
"""

# Setup
i = start_line
j = end_line
f = fp
custom = []

with open(fp) as f:
for line in f.readlines()[i-1:j]:
custom.append(line.strip())

# Return
return custom

def read_sinex_estimate(file):
"""This function reads in the SOLUTION/ESTIMATE block of a SINEX file. It
Expand Down Expand Up @@ -748,7 +771,8 @@ def remove_velocity_sinex(sinex):

# From header, confirm that the SINEX has velocity parameters
header = read_sinex_header_line(sinex)
if header[70:71] == 'V':
header = header.strip()
if header[-1] == 'V':
pass
else:
print("Not removing velocities because SINEX file does not have velocity parameters.")
Expand All @@ -770,6 +794,7 @@ def remove_velocity_sinex(sinex):
header = header.replace(str(old_num_params), str(num_params))
header = header.replace('V', '')
out.write(header)
out.write("\n")
del header

out.write("*-------------------------------------------------------------------------------\n")
Expand Down Expand Up @@ -988,3 +1013,308 @@ def remove_matrixzeros_sinex(sinex):
out.write('%ENDSNX\n')

return


def sinex2dataframe_solution_estimate(fp):
"""This function reads in a SINEX file and returns
a dataframe of SOLUTION/ESTIMATE block only.
:param str sinex: path of input SINEX file
return: dataframe
"""

# Get lines
lines = read_sinex_solution_estimate_block(fp)

# Remove non-data lines
for l in lines[:]:
if l.startswith('*'):
lines.remove(l)
if l.startswith('+'):
lines.remove(l)
if l.startswith('-'):
lines.remove(l)

# Split by column
lines = [i.split() for i in lines]

# Isolate into vectors
row = np.int_(list(zip(*lines))[0])
par = list(zip(*lines))[1]
code = list(zip(*lines))[2]
pt = list(zip(*lines))[3]
soln = list(zip(*lines))[4]
refEpoch = list(zip(*lines))[5]
unit = list(zip(*lines))[6]
s = np.int_(list(zip(*lines))[7])
est = np.float_(list(zip(*lines))[8])
sigma = np.float_(list(zip(*lines))[9])

# Organise into DataFrame
dict_temp = {
"row":row,
"par":par,
"code":code,
"pt":pt,
"soln":soln,
"refEpoch":refEpoch,
"unit":unit,
"s":s,
"est":est,
"sigma":sigma,
}
df = pd.DataFrame(dict_temp)

# Return
return df

def sinex2dataframe_solution_matrix_estimate(fp):
"""This function reads in a SINEX file and returns
a dataframe of SOLUTION/MATRIX_ESTIMATE block only.
:param str sinex: path of input SINEX file
return: dataframe
"""

# Get lines
lines = read_sinex_solution_matrix_estimate_block(fp)

# Remove non-data lines
for l in lines[:]:
if l.startswith('*'):
lines.remove(l)
if l.startswith('+'):
lines.remove(l)
if l.startswith('-'):
lines.remove(l)

# Split by column
lines = [i.split() for i in lines]

# Pad out lines with NaN
for i in range(len(lines)):
if len(lines[i])==5:
continue
if len(lines[i]) == 4:
lines[i].append(np.nan)
if len(lines[i]) == 3:
lines[i].append(np.nan)
lines[i].append(np.nan)

# Isolate into vectors
row = np.int_(list(zip(*lines))[0])
col = np.int_(list(zip(*lines))[1])
q1 = np.float_(list(zip(*lines))[2])
q2 = np.float_(list(zip(*lines))[3])
q3 = np.float_(list(zip(*lines))[4])

# Organise into DataFrame
dict_temp = {
"row":row,
"col":col,
"q1":q1,
"q2":q2,
"q3":q3,
}
df = pd.DataFrame(dict_temp)

# Return
return df

def dataframe2sinex_solution_estimate(df):
"""This function reads in a dataframe of the
SOLUTIONS/ESTIMATE block from a SINEX, the
converts each row to a string in a list
ready for writing to SINEX.
:param dataframe df: dataframe of SOLUTION/ESTIMATE block
return: list of strings
"""

lines =lines = ["+SOLUTION/ESTIMATE"]
lines.append("*INDEX TYPE__ CODE PT SOLN _REF_EPOCH__ UNIT S __ESTIMATED VALUE____ _STD_DEV___")

for i in range(len(df.code)):
l = " %5i %s %s %s %s %s %-3s %s %21.14e %.5e" % (df.row.iloc[i], df.par.iloc[i], df.code.iloc[i], df.pt.iloc[i], df.soln.iloc[i], df.refEpoch.iloc[i], df.unit.iloc[i], df.s.iloc[i], df.est.iloc[i], df.sigma.iloc[i])
lines.append(l)

lines.append("-SOLUTION/ESTIMATE")

# Return
return lines


def dataframe2matrix_snx_vcv(df):

# Matrix dimensions
# - To Do: Handle check for velocities
# - To Do: Handle lower/upper triangle
par = 3 # (check for velocities)
n = len(df.code)*par
Q = np.zeros((n, n))

# Matrix elements and rows of dataframe
i = 0
j = 0
r = 0
while r < len(df.code):

# variances
Q[i+0, j+0] = df.xx[r]
Q[i+1, j+1] = df.yy[r]
Q[i+2, j+2] = df.zz[r]

# covariances
Q[i+1, j] = df.xy[r]
Q[i+2, j] = df.xz[r]
Q[i+2, j+1] = df.yz[r]

# Counter
r = r+1
i = i+3
j = j+3

# Return
return Q

def dataframe2matrix_solution_matrix_estimate(df):

# Matrix size
n = df.row.iloc[-1]
Q = np.zeros((n, n))

for i in range(len(df.row)):

# Get matrix indices
row = int(df.row[i])
col = int(df.col[i])

# Fill PARA2+0
q1 = df.q1[i]
Q[row-1, col-1] = q1
Q[col-1, row-1] = q1

# Fill PARA2+1
q2 = df.q2[i]
Q[row-1, col] = q2
Q[col, row-1] = q2

# Fill PARA2+2
q3 = df.q3[i]
Q[row-1, col+1] = q3
Q[col+1, row-1] = q3

# Return
return Q

def matrix2dataframe_solution_matrix_estimate(m, par, tri):

# Matrix dimensions
# - To Do: Handle check for velocities
# - To Do: Handle lower/upper triangle
Q = m
par = 3
tri = 'lower'
n = np.shape(Q)[0]

row = []
col = []
q1 = []
q2 = []
q3 = []
i = 0
j = 0
while i < n:
while j <= i:
row.append(i+1)
col.append(j+1)
q1.append(Q[i, j])
q2.append(Q[i, j+1])
q3.append(Q[i, j+2])
j += par
j = 0
i += 1

dict_temp = {
"row":row,
"col":col,
"q1":q1,
"q2":q2,
"q3":q3,
}
df = pd.DataFrame(dict_temp)
df.replace(0.0, np.nan, inplace=True)

# Return
return df

def dataframe2sinex_solution_matrix_estimate(df):
"""This function reads in a dataframe of the
SOLUTIONS/MATRIX_ESTIMATE block from a SINEX, the
converts each row to a string in a list
ready for writing to SINEX.
:param dataframe df: dataframe of SOLUTION/ESTIMATE block
return: list of strings
"""

lines =lines = ["+SOLUTION/MATRIX_ESTIMATE L COVA"]
lines.append("*PARA1 PARA2 ____PARA2+0__________ ____PARA2+1__________ ____PARA2+2__________")

for i in range(len(df.row)):
l = " %6i %5i %.14e %.14e %.14e" % (df.row.iloc[i], df.col.iloc[i], df.q1.iloc[i], df.q2.iloc[i], df.q3.iloc[i])
l = l.replace('nan', '')
lines.append(l)

lines.append("-SOLUTION/MATRIX_ESTIMATE L COVA")

# Return
return lines

def writeSINEX(fp, header, comment, SiteID, SolutionEpochs, SolutionEstimate, SolutionMatrixEstimate):

# Open File
with open(fp, 'w') as f:

# Header
f.write("{}".format(header))

f.write("*-------------------------------------------------------------------------------\n")

# Comment
for i in range(len(comment)):

f.write("{}\n".format(comment[i]))

f.write("*-------------------------------------------------------------------------------\n")

#SITE/ID
for i in range(len(SiteID)):

f.write("{}".format(SiteID[i]))

f.write("*-------------------------------------------------------------------------------\n")

# SOLUTION/EPOCHS
for i in range(len(SolutionEpochs)):

f.write("{}".format(SolutionEpochs[i]))

f.write("*-------------------------------------------------------------------------------\n")

# SOLUTION/ESTIMATE
for i in range(len(SolutionEstimate)):

f.write("{}\n".format(SolutionEstimate[i]))

f.write("*-------------------------------------------------------------------------------\n")

# SOLUTION/MATRIX_ESTIMATE
for i in range(len(SolutionMatrixEstimate)):

f.write("{}".format(SolutionMatrixEstimate[i]))

# End Line
f.write("%ENDSNX")

f.close()

0 comments on commit ffe59fa

Please sign in to comment.