"""
Open OceanDataset objects.
"""
# Instructions for developers:
# 1. All functions in this module must return an OceanDataset.
# 2. Add new functions in docs/api.rst
import urllib as _urllib
import warnings as _warnings
from collections import OrderedDict as _OrderedDict
# Import oceanspy dependencies (private)
import xarray as _xr
import yaml as _yaml
# Import from oceanspy (private)
from ._oceandataset import OceanDataset as _OceanDataset
from ._ospy_utils import _check_instance, _restore_coord_attrs
# Import extra modules (private)
try:
import xmitgcm as _xmitgcm
except ImportError: # pragma: no cover
pass
try:
import intake as _intake
from intake.catalog.exceptions import ValidationError
except ImportError: # pragma: no cover
pass
[docs]
def from_netcdf(path, **kwargs):
"""
Load an OceanDataset from a netcdf file.
Parameters
----------
path: str
Path from which to read.
**kwargs:
Keyword arguments for :py:func:`xarray.open_dataset`
Returns
-------
od: OceanDataset
References
----------
http://xarray.pydata.org/en/stable/generated/xarray.open_dataset.html
"""
# Check parameters
_check_instance({"path": path}, "str")
# Open
print("Opening dataset from [{}].".format(path))
ds = _xr.open_dataset(path, **kwargs)
# Put back coordinates attribute that to_netcdf didn't like
ds = _restore_coord_attrs(ds)
# Create and return od
od = _OceanDataset(ds)
return od
[docs]
def from_zarr(path, **kwargs):
"""
Load an OceanDataset from a Zarr store.
Parameters
----------
path: str
Path from which to read.
**kwargs:
Keyword arguments for :py:func:`xarray.open_zarr`
Returns
-------
od: OceanDataset
References
----------
http://xarray.pydata.org/en/stable/generated/xarray.open_zarr.html
"""
# Check parameters
_check_instance({"path": path}, "str")
# Open
print("Opening dataset from [{}].".format(path))
ds = _xr.open_zarr(path, **kwargs)
# Put back coordinates attribute that to_netcdf didn't like
ds = _restore_coord_attrs(ds)
# Create and return od
od = _OceanDataset(ds)
return od
[docs]
def from_catalog(name, catalog_url=None):
"""
Import oceandataset using a yaml catalog.
Try to use :py:mod:`intake-xarray`,
otherwise use :py:mod:`intake-xarray` and
:py:func:`xmitgcm.open_mdsdataset`.
Parameters
----------
name: str
Name of the oceandataset to open.
catalog_url: str or None
Path from which to read the catalog.
If None, use SciServer's catalogs.
References
----------
| intake-xarray: https://github.com/intake/intake-xarray
| xmitgcm: https://xmitgcm.readthedocs.io/en/stable/usage.html
"""
# Message
print("Opening {}.".format(name))
cat, entries, url, intake_switch = _find_entries(name, catalog_url)
# Store all dataset
datasets = []
metadata = {}
for entry in entries:
if intake_switch:
# Use intake-xarray
# Pop metadata
mtdt = cat[entry].metadata
# Create ds
ds = cat[entry].to_dask()
else:
# Pop args and metadata
args = cat[entry].pop("args")
mtdt = cat[entry].pop("metadata", None)
# If iter is a string, need to be evaluated (likely range)
iters = args.pop("iters", None)
if isinstance(iters, str) and "range" in iters:
iters = eval(iters)
if iters is not None:
args["iters"] = iters
# Create ds
with _warnings.catch_warnings():
# Not sure why, Marcello's print a lot of warnings, Neil no.
# TODO: this need to be addressed
_warnings.simplefilter("ignore")
ds = _xmitgcm.open_mdsdataset(**args)
# Rename
rename = mtdt.pop("rename", None)
ds = ds.rename(rename)
# swaps dimension k (index space) to Z (depth) - LLC data
swap_dims = mtdt.pop("swap_dims", None)
if swap_dims is not None:
ds = ds.swap_dims(swap_dims)
# drop k dimension
# ds = ds.drop_vars(["k_p1", "k_u", "k_l", "k"]) This needs fixing
# Fix Z dimensions (Zmd, ...)
default_Zs = ["Zp1", "Zu", "Zl", "Z"]
# Make sure they're sorted with decreasing letter number
default_Zs = sorted(default_Zs, key=len, reverse=True)
for Zdim in default_Zs: # pragma: no cover
for dim, size in ds.sizes.items():
if dim in default_Zs:
continue
elif Zdim in dim:
if size == 1:
ds = ds.squeeze(dim)
else:
if Zdim in ds.dims:
ds = ds.rename({Zdim: "tmp"})
ds = ds.rename({"tmp": Zdim, dim: Zdim})
else:
ds = ds.rename({dim: Zdim})
# Original output
or_out = mtdt.pop("original_output", None)
if or_out is not None:
for var in ds.data_vars:
ds[var].attrs["original_output"] = or_out
# Select
isel = mtdt.pop("isel", None)
if isel is not None:
isel = {key: eval(value) for key, value in isel.items()}
ds = ds.isel(isel)
# Append
datasets.append(ds)
# Metadata
metadata = {**metadata, **mtdt}
# Merge
ds = _xr.merge(datasets)
# Consistent chunking
chunks = {}
for var in ds.data_vars:
if ds[var].chunks is not None:
for i, dim in enumerate(ds[var].dims):
chunk = ds[var].chunks[i]
if dim not in chunks or len(chunks[dim]) < len(chunk):
chunks[dim] = chunk
ds = ds.chunk(chunks)
# Initialize OceanDataset
od = _OceanDataset(ds)
# Shift averages
shift_averages = metadata.pop("shift_averages", None)
if shift_averages is not None:
od = od.shift_averages(**shift_averages)
# Set OceanSpy stuff
for var in ["aliases", "parameters", "name", "description", "projection"]:
val = metadata.pop(var, None)
if val is not None:
od = eval("od.set_{}(val)".format(var))
# Manipulate coordinates
manipulate_coords = metadata.pop("manipulate_coords", None)
if manipulate_coords is not None:
od = od.manipulate_coords(**manipulate_coords)
grid_periodic = metadata.pop("grid_periodic", None)
if grid_periodic is not None:
od = od.set_grid_periodic(**grid_periodic)
# Set grid coordinates
grid_coords = metadata.pop("grid_coords", None)
if grid_coords is not None:
od = od.set_grid_coords(**grid_coords)
# Set grid topology if present (e.g. llc4320)
face_connections = metadata.pop("face_connections", None)
if face_connections is not None:
od = od.set_face_connections(**face_connections)
# Set attributes (use xmitgcm)
try:
from xmitgcm import default_diagnostics
from xmitgcm.utils import parse_available_diagnostics
from xmitgcm.variables import (
extra_grid_variables,
horizontal_grid_variables,
mask_variables,
package_state_variables,
state_variables,
vertical_coordinates,
vertical_grid_variables,
volume_grid_variables,
)
diagnostics = parse_available_diagnostics(default_diagnostics.__file__)
variables = _OrderedDict(
list(vertical_coordinates.items())
+ list(horizontal_grid_variables.items())
+ list(vertical_grid_variables.items())
+ list(volume_grid_variables.items())
+ list(mask_variables.items())
+ list(state_variables.items())
+ list(package_state_variables.items())
+ list(extra_grid_variables.items())
)
variables = _OrderedDict({**diagnostics, **variables})
# My extra attributes
variables["Temp"] = variables.pop("T")
variables["HFacC"] = variables.pop("hFacC")
variables["HFacW"] = variables.pop("hFacW")
variables["HFacS"] = variables.pop("hFacS")
for var in ["HFacC", "HFacW", "HFacS"]:
variables[var]["attrs"]["units"] = " "
variables["phiHyd"] = variables.pop("PHIHYD")
variables["phiHydLow"] = dict(
attrs=dict(
long_name=(
"Phi-Hydrostatic at r-lower boundary"
"(bottom in z-coordinates,"
"top in p-coordinates)"
),
units=variables["phiHyd"]["attrs"]["units"],
)
)
variables["AngleCS"] = dict(
attrs=dict(
standard_name="Cos of grid orientation angle",
long_name="AngleCS",
units=" ",
coordinate="YC XC",
)
)
variables["AngleSN"] = dict(
attrs=dict(
standard_name="Sin of grid orientation angle",
long_name="AngleSN",
units=" ",
)
)
variables["dxF"] = dict(
attrs=dict(
standard_name="x cell face separation",
long_name="cell x size",
units="m",
)
)
variables["dyF"] = dict(
attrs=dict(
standard_name="y cell face separation",
long_name="cell y size",
units="m",
)
)
variables["dxV"] = dict(
attrs=dict(
standard_name="x v-velocity separation",
long_name="cell x size",
units="m",
)
)
variables["dyU"] = dict(
attrs=dict(
standard_name="y u-velocity separation",
long_name="cell y size",
units="m",
)
)
variables["fCori"] = dict(
attrs=dict(
standard_name="Coriolis f at cell center",
long_name="Coriolis f",
units="s^-1",
)
)
variables["fCoriG"] = dict(
attrs=dict(
standard_name="Coriolis f at cell corner",
long_name="Coriolis f",
units="s^-1",
)
)
# LLC4320 Surface Variables
variables["SSS"] = dict(
attrs=dict(
standard_name=("surface sea_water_salinity"),
units=variables["S"]["attrs"]["units"],
)
)
variables["SST"] = dict(
attrs=dict(
standard_name=("surface sea_water_temperature"),
units=variables["Temp"]["attrs"]["units"],
)
)
variables["SSU"] = dict(
attrs=dict(
standard_name=("surface sea_water_x_velocity"),
units=variables["U"]["attrs"]["units"],
)
)
variables["SSV"] = dict(
attrs=dict(
standard_name=("surface sea_water_y_velocity"),
units=variables["V"]["attrs"]["units"],
)
)
# Extract variables in dataset only
variables = _OrderedDict(
**{var: variables[var] for var in od._ds.variables if var in variables}
)
# Add attributes
for var in variables:
attrs = variables[var]["attrs"]
for attr in attrs:
if attr not in od._ds[var].attrs:
od._ds[var].attrs[attr] = attrs[attr]
except ImportError: # pragma: no cover
pass
# Print message
toprint = od.description
for add_str in ["citation", "characteristics", "mates"]:
thisprint = metadata.pop(add_str, None)
if thisprint is not None:
if add_str == "mates":
add_str = "see also"
if thisprint[-1:] == "\n":
thisprint = thisprint[:-1]
toprint += "\n{}:\n * {}".format(
add_str.capitalize(), thisprint.replace("\n", "\n * ")
)
if toprint is not None:
print(toprint.replace("\n\n", "\n"))
return od
def _find_entries(name, catalog_url):
"""
Function used by from_catalog to decode xarray, zarr or xmitgcm catalogs.
It is also used by conf.py in docs to create dataset.rst
Parameters
----------
name: str
Name of the oceandataset to open.
catalog_url: str or None
Path from which to read the catalog.
If None, use SciServer's catalogs.
Returns
-------
cat, entries, url, intake_switch
"""
# Check parameters
if catalog_url is None: # pragma: no cover
url = (
"https://raw.githubusercontent.com/hainegroup/oceanspy/"
"main/sciserver_catalogs/datasets_list.yaml"
)
f = _urllib.request.urlopen(url)
SCISERVER_DATASETS = _yaml.safe_load(f)["datasets"]["sciserver"]
if name not in SCISERVER_DATASETS:
raise ValueError(
"[{}] is not available on SciServer."
" Here is a list of available oceandatasets: {}."
"".format(name, SCISERVER_DATASETS)
)
else:
_check_instance({"catalog_url": catalog_url}, "str")
# Read catalog
try:
if catalog_url is None:
url = (
"https://raw.githubusercontent.com/hainegroup/oceanspy/"
"main/sciserver_catalogs/catalog_xarray.yaml"
)
else:
url = catalog_url
cat = _intake.open_catalog(url)
entries = [entry for entry in list(cat) if name in entry]
if len(entries) == 0:
raise ValidationError("", "")
intake_switch = True
except ValidationError:
if catalog_url is None:
url = (
"https://raw.githubusercontent.com/hainegroup/oceanspy/"
"main/sciserver_catalogs/catalog_xmitgcm.yaml"
)
else:
url = catalog_url # provided by user
# Is it an url?
try:
f = _urllib.request.urlopen(url)
cat = _yaml.safe_load(f)
except ValueError:
with open(url) as f:
cat = _yaml.safe_load(f)
entries = [entry for entry in list(cat) if name in entry]
intake_switch = False
# Error if not available
if len(entries) == 0:
raise ValueError("[{}] is not in the catalog.".format(name))
else:
return cat, entries, url, intake_switch