"""
OceanSpy utilities that don't need OceanDataset objects.
"""
import copy as _copy
import numpy as _np
# Required dependencies (private)
import xarray as _xr
# from .llc_rearrange import face_edge_check
from ._ospy_utils import _check_instance
# Recommended dependencies (private)
try:
from geopy.distance import great_circle as _great_circle
except ImportError: # pragma: no cover
pass
def viewer2range(p):
"""
Takes the output from the poseidon viewer `p` and returns the coordinate
trajectories in X and Y in a way that is compatible with oceanspy.subsample
functions.
Parameters
----------
p: list.
data exported from the Poseidon viewer
Returns
-------
lon: list.
lat: list.
"""
_check_instance({"p": p}, {"p": ["dict"]})
if not set(["type", "features"]) == p.keys():
raise TypeError("not a type extracted by poseidon viewer")
if len(p["features"]) == 0:
raise ValueError("empty data collection")
else:
Time = p["features"][0]["properties"]
timeRange = [Time["timeFrom"], Time["timeTo"]]
fs, nfs = p["features"], len(p["features"])
types = [fs[i]["geometry"]["type"] for i in range(nfs)]
if len(set(types)) > 1:
raise ValueError("too many geometry types in the collection")
p_type = fs[0]["geometry"]["type"]
print("extracting " + p_type)
if p_type == "Polygon":
coords = fs[0]["geometry"]["coordinates"][0]
if p_type == "LineString": # pragma : no cover
coords = fs[0]["geometry"]["coordinates"]
if p_type == "Point":
coords = []
for i in range(len(fs)):
coords.append(fs[i]["geometry"]["coordinates"])
lon = []
lat = []
for i in range(len(coords)):
lon.append(coords[i][0])
lat.append(coords[i][1])
# check that there are no lon values greater than 180 (abs)
ll = _np.where(abs(_np.array(lon)) > 180)[0]
if ll.size:
lon = _np.array(lon)
sign = _np.sign(lon[ll])
fac = _np.round(abs(lon)[ll] / 360)
nlon = lon[ll] - 360 * sign * fac
lon[ll] = nlon
return timeRange, lat, list(lon)
def _rel_lon(x, ref_lon):
"""
Change the definition of 0 longitude.
Return how much east one need to go from ref_lon to x
This function aims to address the confusion caused by
the discontinuity in longitude.
Parameters
----------
x: float, numpy.array, dask.array
longitude to convert
ref_lon: float
a reference longitude. This longitude will become 0deg
Returns
-------
redefined_x: float, numpy.array, dask.array
converted longitude
"""
return (x - ref_lon) % 360
def _reset_range(xn):
"""Resets the definition of XRange, by default the discontinuity at 180 long.
Checks that there is no sign change in x and if there is, the only change that
is allowed is when crossing zero. Otherwise resets ref_lon.
Parameters
----------
x: numpy.array.
array with longitude values.
Returns
-------
x0, x1: numpy.array
endpoints of cutout.
redefined_x: numpy.array.
converted longitude.
"""
_ref_lon = 180
xn = _np.array(xn)
cross = _np.where(_np.diff(_np.sign(xn)))[0]
if cross.size and xn.size != 2:
ref = 180, 0
if cross.size == 1: # one sign change
d1 = [abs(abs(xn[cross[0]]) - i) for i in ref]
i0 = _np.argwhere(_np.array(d1) == min(d1))[0][0]
if i0 == 0: # Pacific
ll = _np.where(xn > 0)[0]
nxn = _copy.deepcopy(xn)
nxn[ll] = nxn[ll] - 360
X = _np.min(nxn) + 360, _np.max(nxn)
_ref_lon = X[0] - (X[0] - X[1]) / 3
else: # Atlantic
X = _np.min(xn), _np.max(xn)
if cross.size > 1: # 2 or more sign changes
da = [abs(abs(xn[i]) - 180) for i in cross]
db = [abs(abs(xn[i]) - 0) for i in cross]
d = _np.array([[da[i], db[i]] for i in range(len(da))])
ind = [_np.argwhere(d[i] == min(d[i]))[0][0] for i in range(len(d))]
if all(ind[0] == i for i in ind) and ind[0] == 0: # Pacific
ll = _np.where(xn > 0)[0]
nxn = _copy.deepcopy(xn)
nxn[ll] = nxn[ll] - 360
X = _np.min(nxn) + 360, _np.max(nxn)
_ref_lon = X[0] - (X[0] - X[1]) / 3
if all(ind[0] == i for i in ind) and ind[0] == 1: # Atlantic
X = _np.min(xn), _np.max(xn)
else:
X = None
if cross.size == 0 or xn.size == 2:
if xn.size == 2:
X = xn[0], xn[1]
if xn[0] > xn[1]:
_ref_lon = X[0] - (X[0] - X[1]) / 3
else:
X = _np.min(xn), _np.max(xn)
if X is not None:
X = _np.array(X)
return X, _np.round(_ref_lon, 2)
[docs]
def spherical2cartesian(Y, X, R=None):
"""
Convert spherical coordinates to cartesian.
Parameters
----------
Y: _xr.DataArray or array_like
Spherical Y coordinate (latitude)
X: _xr.DataArray or array_like
Spherical X coordinate (longitude)
R: scalar
Earth radius in km
If None, use geopy default
Returns
-------
x: _xr.DataArray or array_like
Cartesian x coordinate
y: _xr.DataArray or array_like
Cartesian y coordinate
z: scalar
Cartesian z coordinate
"""
# Check parameters
_check_instance({"R": R}, {"R": ["type(None)", "numpy.ScalarType"]})
# Check parameters
if R is None:
from geopy.distance import EARTH_RADIUS
R = EARTH_RADIUS
# Convert
Y_rad = _np.deg2rad(Y)
X_rad = _np.deg2rad(X)
x = R * _np.cos(Y_rad) * _np.cos(X_rad)
y = R * _np.cos(Y_rad) * _np.sin(X_rad)
z = R * _np.sin(Y_rad)
return x, y, z
[docs]
def great_circle_path(lat1, lon1, lat2, lon2, delta_km=None, R=None):
"""
Generate a great circle trajectory specifying the distance resolution.
Parameters
----------
lat1: scalar
Latitude of vertex 1 [degrees N]
lon1: scalar
Longitude of vertex 1 [degrees E]
lat2: scalar
Latitude of vertex 2 [degrees N]
lon2: scalar
Longitude of vertex 2 [degrees E]
delta_km: scalar, None
Distance resolution [km]
If None, only use vertices and return distance
R: scalar, None
Earth radius in km
If None, use geopy default
Returns
-------
lats: 1D numpy.ndarray
Great circle latitudes [degrees N]
lons: 1D numpy.ndarray
Great circle longitudes [degrees E]
dist: 1D numpy.ndarray
Distances from vertex 1 [km]
References
----------
Converted to python and adapted from:
`<https://ww2.mathworks.cn/matlabcentral/mlc-downloads/downloads/
submissions/8493/versions/2/previews/generate_great_circle_path.m
/index.html?access_key=>`_
"""
# Check parameters
_check_instance(
{
"lat1": lat1,
"lon1": lon1,
"lat2": lat2,
"lon2": lon2,
"delta_km": delta_km,
"R": R,
},
{
"lat1": "numpy.ScalarType",
"lon1": "numpy.ScalarType",
"lat2": "numpy.ScalarType",
"lon2": "numpy.ScalarType",
"delta_km": ["type(None)", "numpy.ScalarType"],
"R": ["type(None)", "numpy.ScalarType"],
},
)
# Check parameters
if lat1 == lat2 and lon1 == lon2:
raise ValueError("Vertexes are overlapping")
if delta_km is not None and delta_km <= 0:
raise ValueError("`delta_km` can not be zero or negative")
# Check parameters
if R is None:
from geopy.distance import EARTH_RADIUS
R = EARTH_RADIUS
if delta_km is not None:
# Convert to radians
lat1 = _np.deg2rad(lat1)
lon1 = _np.deg2rad(lon1)
lat2 = _np.deg2rad(lat2)
lon2 = _np.deg2rad(lon2)
# Using the right-handed coordinate frame
# with Z toward (lat,lon)=(0,0) and
# Y toward (lat,lon)=(90,0),
# the unit_radial of a (lat,lon) is given by:
# [ cos(lat)*sin(lon) ]
# unit_radial = [ sin(lat) ]
# [ cos(lat)*cos(lon) ]
unit_radial_1 = _np.array(
[
_np.cos(lat1) * _np.sin(lon1),
_np.sin(lat1),
_np.cos(lat1) * _np.cos(lon1),
]
)
unit_radial_2 = _np.array(
[
_np.cos(lat2) * _np.sin(lon2),
_np.sin(lat2),
_np.cos(lat2) * _np.cos(lon2),
]
)
# Define the vector that is normal to
# both unit_radial_1 & unit_radial_2
normal_vec = _np.cross(unit_radial_1, unit_radial_2)
unit_normal = normal_vec / _np.sqrt(_np.sum(normal_vec**2))
# Define the vector that is tangent to the great circle flight path at
# (lat1,lon1)
tangent_1_vec = _np.cross(unit_normal, unit_radial_1)
unit_tangent_1 = tangent_1_vec / _np.sqrt(_np.sum(tangent_1_vec**2))
# Determine the total arc distance from 1 to 2 (radians)
total_arc_angle_1_to_2 = _np.arccos(unit_radial_1.dot(unit_radial_2))
# Determine the set of arc angles to use
# (approximately spaced by delta_km)
angs2use = _np.linspace(
0,
total_arc_angle_1_to_2,
int(_np.ceil(total_arc_angle_1_to_2 / (delta_km / R))),
) # radians
M = angs2use.size
# unit_radials is a 3xM array of M unit radials
term1 = _np.array([unit_radial_1]).transpose().dot(_np.ones((1, M)))
term2 = _np.ones((3, 1)).dot(_np.array([_np.cos(angs2use)]))
term3 = _np.array([unit_tangent_1]).transpose().dot(_np.ones((1, M)))
term4 = _np.ones((3, 1)).dot(_np.array([_np.sin(angs2use)]))
unit_radials = term1 * term2 + term3 * term4
# Convert to latitudes and longitudes
lats = _np.rad2deg(_np.arcsin(unit_radials[1, :]))
lons = _np.rad2deg(_np.arctan2(unit_radials[0, :], unit_radials[2, :]))
else:
# Use input lon and lat
lats = _np.concatenate((_np.reshape(lat1, 1), _np.reshape(lat2, 1)))
lons = _np.concatenate((_np.reshape(lon1, 1), _np.reshape(lon2, 1)))
# Compute distance
dists = _np.zeros(lons.shape)
for i in range(1, len(lons)):
coord1 = (lats[i - 1], lons[i - 1])
coord2 = (lats[i], lons[i])
dists[i] = _great_circle(coord1, coord2, radius=R).km
dists = _np.cumsum(dists)
return lats, lons, dists
def circle_path_array(_Y, _X, R, _res=25):
"""
Given a list of prescribed coordinate points of len(N), returns an array of
increased (or equal) length, wherein the added values are computed following
a great circle path. The spatial resolution is set by _res=25 (by default) in
km=25.
"""
# Check parameters
_check_instance(
{
"lats": _Y,
"lons": _X,
"res": _res,
"R": R,
},
{
"lats": ["list", "numpy.ndarray"],
"lons": ["list", "numpy.ndarray"],
"res": "numpy.ScalarType",
"R": "numpy.ScalarType",
},
)
_Y = _np.asarray(_Y, dtype=_np.float64)
_X = _np.asarray(_X, dtype=_np.float64)
if len(_Y) != len(_X):
raise ValueError("_Y and _X arrays must have same number of entries")
diffsum = abs(_X[1:] - _X[:-1]) + abs(_Y[1:] - _Y[:-1])
if _np.sum(diffsum) == 0:
raise ValueError("There must be at least two different coordinates points")
dists = []
for i in range(len(_Y) - 1):
dists.append(_great_circle((_Y[i], _X[i]), (_Y[i + 1], _X[i + 1]), radius=R).km)
k = _np.argwhere(_np.array(dists) > _res).squeeze()
nY = [[] * k.size for i in range(k.size)]
nX = [[] * k.size for i in range(k.size)]
ndists = []
if k.size:
if k.size == 1: # only one element
nY, nX, ndists = great_circle_path(_Y[k], _X[k], _Y[k + 1], _X[k + 1], _res)
if len(nY) == 2:
nY, nX, ndists = great_circle_path(
_Y[k], _X[k], _Y[k + 1], _X[k + 1], _res / 2
)
else:
for ii in range(k.size):
ny, nx, dists = great_circle_path(
_Y[k[ii]], _X[k[ii]], _Y[k[ii] + 1], _X[k[ii] + 1], _res
)
if len(ny) == 2:
ny, nx, dists = great_circle_path(
_Y[k[ii]], _X[k[ii]], _Y[k[ii] + 1], _X[k[ii] + 1], _res / 2
)
nY[ii] = list(ny)
nX[ii] = list(nx)
Nn = [len(nY[i]) for i in range(len(nY))]
nk = 0 * k
for ii in range(len(k)):
nk[ii] = k[ii] + sum(Nn[:ii])
_Y = list(_Y[: nk[ii] + 1]) + list(nY[ii]) + list(_Y[nk[ii] + 1 :])
_X = list(_X[: nk[ii] + 1]) + list(nX[ii]) + list(_X[nk[ii] + 1 :])
nY, nX = _Y, _X # update
else:
nY, nX = _Y, _X
return nY, nX
[docs]
def cartesian_path(x1, y1, x2, y2, delta=None):
"""
Generate a trajectory specifying the distance resolution.
Parameters
----------
x1: scalar
x coordinate of vertex 1
y1: scalar
y coordinate of vertex 1
x2: scalar
x coordinate of vertex 2
y2: scalar
y coordinate of vertex 2
delta: scalar
Distance resolution (same units of x and y)
If None, only use vertices and return distance
Returns
-------
xs: 1D numpy.ndarray
x coordinates
ys: 1D numpy.ndarray
y coordinates
dist: 1D numpy.ndarray
distances
"""
# Check parameters
_check_instance(
{"x1": x1, "x2": x2, "y1": y1, "y2": y2, "delta": delta},
{
"x1": "numpy.ScalarType",
"x2": "numpy.ScalarType",
"y1": "numpy.ScalarType",
"y2": "numpy.ScalarType",
"delta": ["type(None)", "numpy.ScalarType"],
},
)
if x1 == x2 and x1 == x2:
raise ValueError("Vertexes are overlapping")
if delta is not None and delta <= 0:
raise ValueError("`delta` can not be zero or negative")
# Interpolate
dist_tot = _np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
if delta is None:
coefs = _np.linspace(0, 1, 2)
else:
coefs = _np.linspace(0, 1, int(round(dist_tot / delta)))
xs = x1 + coefs * (x2 - x1)
ys = y1 + coefs * (y2 - y1)
dists = coefs * dist_tot
return xs, ys, dists
[docs]
def densjmd95(s, t, p):
"""
Density of Sea Water using Jackett and McDougall 1995 (JAOT 12)
polynomial (modified UNESCO polynomial) [JaMc95]_.
jmd95.py:
`<http://mitgcm.org/\
download/daily_snapshot/MITgcm/utils/python/MITgcmutils/MITgcmutils/jmd95.py>`_
Parameters
----------
s: xarray.DatArray, array-like
salinity [psu (PSS-78)]
t: xarray.DatArray, array-like
potential temperature [degree C (IPTS-68)]
p: xarray.DatArray, array-like
pressure [dbar]
(p may have dims 1x1, mx1, 1xn or mxn for S(mxn))
Returns
-------
rho: xarray.DatArray, array-like
density [kg/m^3]
References
----------
.. [JaMc95]
Jackett, D.R. and T.J. Mcdougall, 1995:\
Minimal Adjustment of Hydrographic Profiles\
to Achieve Static Stability.\
J. Atmos. Oceanic Technol., 12, 381–389,\
https://doi.org/10.1175/1520-0426(1995)012<0381:MAOHPT>2.0.CO;2
"""
# make sure arguments are floating point
for var in [s, t, p]:
if isinstance(var, _xr.DataArray):
var = var.astype("float")
else:
var = _np.asfarray(var)
# coefficients nonlinear equation of state in pressure coordinates for
# 1. density of fresh water at p = 0
eosJMDCFw = [
999.842594,
6.793952e-02,
-9.095290e-03,
1.001685e-04,
-1.120083e-06,
6.536332e-09,
]
# 2. density of sea water at p = 0
eosJMDCSw = [
8.244930e-01,
-4.089900e-03,
7.643800e-05,
-8.246700e-07,
5.387500e-09,
-5.724660e-03,
1.022700e-04,
-1.654600e-06,
4.831400e-04,
]
# coefficients in pressure coordinates for
# 3. secant bulk modulus K of fresh water at p = 0
eosJMDCKFw = [1.965933e04, 1.444304e02, -1.706103e00, 9.648704e-03, -4.190253e-05]
# 4. secant bulk modulus K of sea water at p = 0
eosJMDCKSw = [
5.284855e01,
-3.101089e-01,
6.283263e-03,
-5.084188e-05,
3.886640e-01,
9.085835e-03,
-4.619924e-04,
]
# 5. secant bulk modulus K of sea water at p
eosJMDCKP = [
3.186519e00,
2.212276e-02,
-2.984642e-04,
1.956415e-06,
6.704388e-03,
-1.847318e-04,
2.059331e-07,
1.480266e-04,
2.102898e-04,
-1.202016e-05,
1.394680e-07,
-2.040237e-06,
6.128773e-08,
6.207323e-10,
]
# convert pressure to bar
p = 0.1 * p
p2 = p * p
t2 = t * t
t3 = t2 * t
t4 = t3 * t
s3o2 = s * _np.sqrt(s)
# density of freshwater at the surface
rho = (
eosJMDCFw[0]
+ eosJMDCFw[1] * t
+ eosJMDCFw[2] * t2
+ eosJMDCFw[3] * t3
+ eosJMDCFw[4] * t4
+ eosJMDCFw[5] * t4 * t
)
# density of sea water at the surface
rho = (
rho
+ s
* (
eosJMDCSw[0]
+ eosJMDCSw[1] * t
+ eosJMDCSw[2] * t2
+ eosJMDCSw[3] * t3
+ eosJMDCSw[4] * t4
)
+ s3o2 * (eosJMDCSw[5] + eosJMDCSw[6] * t + eosJMDCSw[7] * t2)
+ eosJMDCSw[8] * s * s
)
# secant bulk modulus of fresh water at the surface
bulkmod = (
eosJMDCKFw[0]
+ eosJMDCKFw[1] * t
+ eosJMDCKFw[2] * t2
+ eosJMDCKFw[3] * t3
+ eosJMDCKFw[4] * t4
)
# secant bulk modulus of sea water at the surface
bulkmod = (
bulkmod
+ s
* (eosJMDCKSw[0] + eosJMDCKSw[1] * t + eosJMDCKSw[2] * t2 + eosJMDCKSw[3] * t3)
+ s3o2 * (eosJMDCKSw[4] + eosJMDCKSw[5] * t + eosJMDCKSw[6] * t2)
)
# secant bulk modulus of sea water at pressure p
bulkmod = (
bulkmod
+ p * (eosJMDCKP[0] + eosJMDCKP[1] * t + eosJMDCKP[2] * t2 + eosJMDCKP[3] * t3)
+ p * s * (eosJMDCKP[4] + eosJMDCKP[5] * t + eosJMDCKP[6] * t2)
+ p * s3o2 * eosJMDCKP[7]
+ p2 * (eosJMDCKP[8] + eosJMDCKP[9] * t + eosJMDCKP[10] * t2)
+ p2 * s * (eosJMDCKP[11] + eosJMDCKP[12] * t + eosJMDCKP[13] * t2)
)
rho = rho / (1.0 - p / bulkmod)
return rho
[docs]
def densmdjwf(s, t, p):
"""
Density of Sea Water using McDougall et al. 2003 (JAOT 20)
polynomial (Gibbs Potential) [McJa03]_.
mdjwf.py:
`<https://github.com/\
MITgcm/MITgcm/blob/master/utils/python/MITgcmutils/MITgcmutils/mdjwf.py>`_
Parameters
----------
s: xarray.DatArray, array-like
salinity [psu (PSS-78)]
t: xarray.DatArray, array-like
potential temperature [degree C (IPTS-68)]
p: xarray.DatArray, array-like
pressure [dbar]
(p may have dims 1x1, mx1, 1xn or mxn for S(mxn))
Returns
-------
rho: xarray.DatArray, array-like
density [kg/m^3]
References
----------
.. [McJa03]
McDougall, T.J., D.R. Jackett, D.G. Wright, and R. Feistel, 2003:\
Accurate and Computationally Efficient Algorithms for\
Potential Temperature and Density of Seawater.\
J. Atmos. Oceanic Technol., 20, 730–741,\
https://doi.org/10.1175/1520-0426(2003)20<730:AACEAF>2.0.CO;2
"""
# make sure arguments are floating point
for var in [s, t, p]:
if isinstance(var, _xr.DataArray):
var = var.astype("float")
else:
var = _np.asfarray(var)
# coefficients nonlinear equation of state in pressure coordinates for
eosMDJWFnum = [
7.35212840e00,
-5.45928211e-02,
3.98476704e-04,
2.96938239e00,
-7.23268813e-03,
2.12382341e-03,
1.04004591e-02,
1.03970529e-07,
5.18761880e-06,
-3.24041825e-08,
-1.23869360e-11,
9.99843699e02,
]
eosMDJWFden = [
7.28606739e-03,
-4.60835542e-05,
3.68390573e-07,
1.80809186e-10,
2.14691708e-03,
-9.27062484e-06,
-1.78343643e-10,
4.76534122e-06,
1.63410736e-09,
5.30848875e-06,
-3.03175128e-16,
-1.27934137e-17,
1.00000000e00,
]
p1 = _copy.copy(p)
t1 = _copy.copy(t)
t2 = t * t
s1 = _copy.copy(s)
sp5 = _np.sqrt(s1)
p1t1 = p1 * t1
num = (
eosMDJWFnum[11]
+ t1 * (eosMDJWFnum[0] + t1 * (eosMDJWFnum[1] + eosMDJWFnum[2] * t1))
+ s1 * (eosMDJWFnum[3] + eosMDJWFnum[4] * t1 + eosMDJWFnum[5] * s1)
+ p1
* (
eosMDJWFnum[6]
+ eosMDJWFnum[7] * t2
+ eosMDJWFnum[8] * s1
+ p1 * (eosMDJWFnum[9] + eosMDJWFnum[10] * t2)
)
)
den = (
eosMDJWFden[12]
+ t1
* (
eosMDJWFden[0]
+ t1 * (eosMDJWFden[1] + t1 * (eosMDJWFden[2] + t1 * eosMDJWFden[3]))
)
+ s1
* (
eosMDJWFden[4]
+ t1 * (eosMDJWFden[5] + eosMDJWFden[6] * t2)
+ sp5 * (eosMDJWFden[7] + eosMDJWFden[8] * t2)
)
+ p1 * (eosMDJWFden[9] + p1t1 * (eosMDJWFden[10] * t2 + eosMDJWFden[11] * p1))
)
epsln = 0
denom = 1.0 / (epsln + den)
rho = num * denom
return rho
def static_pressure(Z): # pragma: no cover
"""
Returns the static pressure given depth.
"""
# Coefficients to determine static pressure
c = [0.059808, -0.025, 0.100766, 2.28405e-7]
P = c[0] * _np.exp((c[1] * Z) - 1) + c[2] * Z + c[3] * (Z**2)
P = 10 * P # decibars
return P
[docs]
def Coriolis_parameter(Y, omega=7.2921e-5):
"""
Compute Coriolis parameter (both vertical and horizontal components).
.. math::
(f, e) = (\\hat{\\mathbf{z}}\\cdot
\\left(2\\mathbf{\\Omega}\\right),
\\hat{\\mathbf{y}}\\cdot\\left(2\\mathbf{\\Omega}\\right))
= (2|\\mathbf{\\Omega}|\\sin{\\theta},
2|\\mathbf{\\Omega}|\\cos{\\theta})
Parameters
----------
Y: _xr.DataArray or array_like
Y coordinate (latitude)
omega: scalar
Rotation rate of Earth (rad/s)
Returns
-------
f: _xr.DataArray or array_like
Vertical component of the Earth's rotation vector
e: _xr.DataArray or array_like
Horizontal component of the Earth's rotation vector
"""
# TODO: check Y and omega
# Convert
Y_rad = _np.deg2rad(Y)
f = 2 * omega * _np.sin(Y_rad)
e = 2 * omega * _np.cos(Y_rad)
return f, e
def get_maskH(ds, add_Hbdr, XRange, YRange, ref_lon=0):
"""Define this function to avoid repeated code. First time this runs,
the objective is to figure out which faces survive the cutout. This info
is then passed, when transforming llc-grids, to llc_rearrange. Second
time this code runs, it gets applied on a dataset without faces as a
dimension.
"""
if "face" in ds.dims and len(ds.X) == 4320: # pragma: no cover
args = {"Xp1": slice(0, 4320, 10), "Yp1": slice(0, 4320, 10)}
ds = _copy.deepcopy(ds.isel(**args))
maskH = _xr.ones_like(ds["XG"])
if YRange is not None:
# Use arrays
YRange = _np.asarray([_np.min(YRange) - add_Hbdr, _np.max(YRange) + add_Hbdr])
YRange = YRange.astype(ds["YG"].dtype)
# Get the closest
for i, Y in enumerate(YRange):
diff = _np.fabs(ds["YG"] - Y)
YRange[i] = ds["YG"].where(diff == diff.min()).min().values
maskH = maskH.where(
_np.logical_and(ds["YG"] >= YRange[0], ds["YG"] <= YRange[-1]), 0
)
if XRange is not None:
# Use arrays
XRange = _np.asarray([XRange[0] - add_Hbdr, XRange[-1] + add_Hbdr])
XRange = XRange.astype(ds["XG"].dtype)
# Get the closest
for i, X in enumerate(XRange):
diff = _np.fabs(ds["XG"] - X)
XRange[i] = ds["XG"].where(diff == diff.min()).min().values
maskH = maskH.where(
_np.logical_and(
_rel_lon(ds["XG"], ref_lon) >= _rel_lon(XRange[0], ref_lon),
_rel_lon(ds["XG"], ref_lon) <= _rel_lon(XRange[-1], ref_lon),
),
0,
)
# Can't be all zeros
if maskH.sum() == 0:
raise ValueError("Zero grid points in the horizontal range")
# Find horizontal indexes
maskH = maskH.assign_coords(
Yp1=_np.arange(len(maskH["Yp1"])), Xp1=_np.arange(len(maskH["Xp1"]))
)
dmaskH = maskH.where(maskH.compute(), drop=True)
return maskH, dmaskH, XRange, YRange
def reset_dim(_ds, N, dim="mooring"):
"""resets the dimension mooring by shifting it by a value set by N"""
_ds["n" + dim] = N + _ds[dim]
_ds = _ds.swap_dims({dim: "n" + dim}).drop_vars(dim).rename({"n" + dim: dim})
return _ds
def diff_and_inds_where_insert(ix, iy):
dx, dy = (_np.diff(ii) for ii in (ix, iy))
inds = _np.argwhere(_np.abs(dx) + _np.abs(dy) > 1).squeeze()
return dx, dy, inds
def remove_repeated(_iX, _iY):
"""Attemps to remove repeated coords not adjascent to each other,
while retaining the trajectory property of being simply connected
(i.e. the distance between each index point is one). If it cannot
remove repeated coordinate values, returns the original array.
"""
_ix, _iy = _iX, _iY
nn = []
for n in range(len(_ix)):
val = _np.where(abs(_ix - _ix[n]) + abs(_iy - _iy[n]) == 0)[0]
# select only repeated values with deg of multiplicity = 2
if len(val) == 2:
if len(nn) == 0:
nn.append(list(val))
if len(nn) > 0 and (val != nn).all():
nn.append(list(val))
if _np.array(nn).size:
dn = [nn[i][1] - nn[i][0] for i in range(len(nn))]
# remove if the distance between repeated coords is 2
mask = _np.where(_np.array(dn) == 2)[0]
remove = [nn[i][1] for i in mask]
_ix, _iy = (_np.delete(ii, remove) for ii in (_ix, _iy))
# find the hole left
mask = _np.abs(_np.diff(_ix)) + _np.abs(_np.diff(_iy)) == 2
# delete hole left behind
_ix, _iy = (_np.delete(ii, _np.argwhere(mask)) for ii in (_ix, _iy))
# verify path is simply connected
dx, dy, inds = diff_and_inds_where_insert(_ix, _iy)
if inds.size: # pragma: no cover
_ix = _iX
_iY = _iY
return _ix, _iy
def connector(_ix, _iy):
"""
Takes a collection of points defined in logical space (ix, iy), each of
equal length arrays, and returns a new continous array (_ix, _iy) that
contains the original elements now with unit spacing.
"""
if len(_ix) == len(_iy) == 1:
return _ix, _iy
else:
mask = _np.abs(_np.diff(_ix)) + _np.abs(_np.diff(_iy)) == 0
_ix, _iy = (_np.delete(ii, _np.argwhere(mask)) for ii in (_ix, _iy))
# Initialize variables"
dx, dy, inds = diff_and_inds_where_insert(_ix, _iy)
while inds.size:
dx, dy = (di[inds] for di in (dx, dy))
mask = _np.abs(dx * dy) == 1
_ix = _np.insert(_ix, inds + 1, _ix[inds] + (dx / 2).astype(int))
_iy = _np.insert(
_iy, inds + 1, _iy[inds] + _np.where(mask, dy, (dy / 2).astype(int))
)
# Prepare for next iteration
dx, dy, inds = diff_and_inds_where_insert(_ix, _iy)
_iX, _iY = remove_repeated(_ix, _iy)
return _iX, _iY