#!/usr/bin/env python
u"""
utilities.py
Written by Tyler Sutterley (10/2024)
Download and management utilities for syncing time and auxiliary files
Adds additional modules to the gravity_toolkit utilities
PYTHON DEPENDENCIES:
lxml: processing XML and HTML in Python (https://pypi.python.org/pypi/lxml)
utilities.py: download and management utilities for syncing files
UPDATE HISTORY:
Updated 10/2024: update CMR search utility to replace deprecated scrolling
https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html
Updated 11/2023: updated ssl context to fix deprecation error
Updated 05/2023: use pathlib to define and operate on paths
Updated 01/2023: add default ssl context attribute with protocol
Updated 12/2022: functions for managing and maintaining git repositories
Updated 11/2022: use f-strings for formatting verbose or ascii output
Updated 10/2022: added option to use CMR provided GES DISC subsetting host
Updated 08/2022: hardcode GES DISC subsetting API hostname
Updated 06/2022: add NASA Common Metadata Repository (CMR) queries
added function to build GES DISC subsetting API requests
Updated 04/2022: updated docstrings to numpy documentation format
Written 01/2021
"""
# extend gravity_toolkit utilities
from __future__ import annotations
from gravity_toolkit.utilities import *
# PURPOSE: get the git hash value
[docs]
def get_git_revision_hash(
refname: str = 'HEAD',
short: bool = False
):
"""
Get the ``git`` hash value for a particular reference
Parameters
----------
refname: str, default HEAD
Symbolic reference name
short: bool, default False
Return the shorted hash value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath('.git')
# build command
cmd = ['git', f'--git-dir={gitpath}', 'rev-parse']
cmd.append('--short') if short else None
cmd.append(refname)
# get output
with warnings.catch_warnings():
return str(subprocess.check_output(cmd), encoding='utf8').strip()
# PURPOSE: get the current git status
[docs]
def get_git_status():
"""Get the status of a ``git`` repository as a boolean value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath('.git')
# build command
cmd = ['git', f'--git-dir={gitpath}', 'status', '--porcelain']
with warnings.catch_warnings():
return bool(subprocess.check_output(cmd))
[docs]
def _create_default_ssl_context() -> ssl.SSLContext:
"""Creates the default SSL context
"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
_set_ssl_context_options(context)
context.options |= ssl.OP_NO_COMPRESSION
return context
[docs]
def _create_ssl_context_no_verify() -> ssl.SSLContext:
"""Creates an SSL context for unverified connections
"""
context = _create_default_ssl_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
[docs]
def _set_ssl_context_options(context: ssl.SSLContext) -> None:
"""Sets the default options for the SSL context
"""
if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7):
context.minimum_version = ssl.TLSVersion.TLSv1_2
else:
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
# default ssl context
_default_ssl_context = _create_ssl_context_no_verify()
# PURPOSE: list a directory on NASA GES DISC https server
[docs]
def gesdisc_list(
HOST: str | list,
username: str | None = None,
password: str | None = None,
build: bool = False,
timeout: int | None = None,
urs: str = 'urs.earthdata.nasa.gov',
parser = lxml.etree.HTMLParser(),
format: str = '%Y-%m-%d %H:%M',
pattern: str = '',
sort: bool = False
):
"""
List a directory on NASA GES DISC servers
Parameters
----------
HOST: str or list
remote https host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener with NASA Earthdata credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
urs: str, default 'urs.earthdata.nasa.gov'
Earthdata login URS 3 host
parser: obj, default lxml.etree.HTMLParser()
HTML parser for ``lxml``
format: str, default '%Y-%m-%d %H:%M'
format for input time string
pattern: str, default ''
regular expression pattern for reducing list
sort: bool, default False
sort output list
Returns
-------
colnames: list
column names in a directory
collastmod: list
last modification times for items in the directory
"""
# use netrc credentials
if build and not (username or password):
username,_,password = netrc.netrc().authenticators(urs)
# build urllib2 opener with credentials
if build:
build_opener(username, password, password_manager=True,
authorization_header=False)
# verify inputs for remote https host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from https
try:
# Create and submit request.
request=urllib2.Request(posixpath.join(*HOST))
response=urllib2.urlopen(request, timeout=timeout)
except (urllib2.HTTPError, urllib2.URLError):
raise Exception('List error from {0}'.format(posixpath.join(*HOST)))
else:
# read and parse request for files (column names and modified times)
tree = lxml.etree.parse(response,parser)
colnames = tree.xpath('//tr/td[not(@*)]//a/@href')
# get the Unix timestamp value for a modification time
lastmod = [get_unix_time(i,format=format)
for i in tree.xpath('//tr/td[@align="right"][1]/text()')]
# reduce using regular expression pattern
if pattern:
i = [i for i,f in enumerate(colnames) if re.search(pattern,f)]
# reduce list of column names and last modified times
colnames = [colnames[indice] for indice in i]
lastmod = [lastmod[indice] for indice in i]
# sort the list
if sort:
i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])]
# sort list of column names and last modified times
colnames = [colnames[indice] for indice in i]
lastmod = [lastmod[indice] for indice in i]
# return the list of column names and last modified times
return (colnames,lastmod)
# PURPOSE: filter the CMR json response for desired data files
[docs]
def cmr_filter_json(
search_results: dict,
endpoint: str = "data",
request_type: str = "application/x-netcdf"
):
"""
Filter the CMR json response for desired data files
Parameters
----------
search_results: dict
json response from CMR query
endpoint: str, default 'data'
url endpoint type
- ``'data'``: NASA Earthdata https archive
- ``'opendap'``: NASA Earthdata OPeNDAP archive
- ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket
request_type: str, default 'application/x-netcdf'
data type for reducing CMR query
Returns
-------
granule_names: list
Model granule names
granule_urls: list
Model granule urls
granule_mtimes: list
Model granule modification times
"""
# output list of granule ids, urls and modified times
granule_names = []
granule_urls = []
granule_mtimes = []
# check that there are urls for request
if ('feed' not in search_results) or ('entry' not in search_results['feed']):
return (granule_names,granule_urls)
# descriptor links for each endpoint
rel = {}
rel['data'] = "http://esipfed.org/ns/fedsearch/1.1/data#"
rel['opendap'] = "http://esipfed.org/ns/fedsearch/1.1/service#"
rel['s3'] = "http://esipfed.org/ns/fedsearch/1.1/s3#"
# iterate over references and get cmr location
for entry in search_results['feed']['entry']:
granule_names.append(entry['producer_granule_id'])
granule_mtimes.append(get_unix_time(entry['updated'],
format='%Y-%m-%dT%H:%M:%S.%f%z'))
for link in entry['links']:
# skip inherited granules
if ('inherited' in link.keys()):
continue
# append if selected endpoint
if (link['rel'] == rel[endpoint]):
granule_urls.append(link['href'])
break
# alternatively append if selected data type
if ('type' not in link.keys()):
continue
if (link['type'] == request_type):
granule_urls.append(link['href'])
break
# return the list of urls, granule ids and modified times
return (granule_names, granule_urls, granule_mtimes)
# PURPOSE: cmr queries for model data products
[docs]
def cmr(
short_name: str,
version: str | int | None = None,
start_date: str | None = None,
end_date: str | None = None,
provider: str | None = 'GES_DISC',
endpoint: str | None = 'data',
request_type: str | None = 'application/x-netcdf',
verbose: bool = False,
fid = sys.stdout
):
"""
Query the NASA Common Metadata Repository (CMR) for model data
Parameters
----------
short_name: str
Model shortname in the CMR system
version: str or NoneType, default None
Model version
start_date: str or NoneType, default None
starting date for CMR product query
end_date: str or NoneType, default None
ending date for CMR product query
provider: str, default 'GES_DISC'
CMR data provider
- ``'GES_DISC'``: GESDISC
- ``'GESDISCCLD'``: GESDISC Cumulus
- ``'PODAAC'``: PO.DAAC Drive
- ``'POCLOUD'``: PO.DAAC Cumulus
endpoint: str, default 'data'
url endpoint type
- ``'data'``: NASA Earthdata https archive
- ``'opendap'``: NASA Earthdata OPeNDAP archive
- ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket
request_type: str, default 'application/x-netcdf'
data type for reducing CMR query
verbose: bool, default False
print CMR query information
fid: obj, default sys.stdout
open file object to print if verbose
Returns
-------
granule_names: list
Model granule names
granule_urls: list
Model granule urls
granule_mtimes: list
Model granule modification times
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# build urllib2 opener with SSL context
# https://docs.python.org/3/howto/urllib2.html#id5
handler = []
# Create cookie jar for storing cookies
cookie_jar = CookieJar()
handler.append(urllib2.HTTPCookieProcessor(cookie_jar))
handler.append(urllib2.HTTPSHandler(context=_default_ssl_context))
# create "opener" (OpenerDirector instance)
opener = urllib2.build_opener(*handler)
# build CMR query
cmr_query_type = 'granules'
cmr_format = 'json'
cmr_page_size = 2000
CMR_HOST = ['https://cmr.earthdata.nasa.gov','search',
f'{cmr_query_type}.{cmr_format}']
# build list of CMR query parameters
CMR_KEYS = []
CMR_KEYS.append(f'?provider={provider}')
CMR_KEYS.append('&sort_key[]=start_date')
CMR_KEYS.append('&sort_key[]=producer_granule_id')
CMR_KEYS.append(f'&page_size={cmr_page_size}')
# dictionary of product shortnames and version
CMR_KEYS.append(f'&short_name={short_name}')
if version:
CMR_KEYS.append(f'&version={version}')
# append keys for start and end time
# verify that start and end times are in ISO format
start_date = isoformat(start_date) if start_date else ''
end_date = isoformat(end_date) if end_date else ''
CMR_KEYS.append(f'&temporal={start_date},{end_date}')
# full CMR query url
cmr_query_url = "".join([posixpath.join(*CMR_HOST),*CMR_KEYS])
logging.info(f'CMR request={cmr_query_url}')
# output list of granule names and urls
granule_names = []
granule_urls = []
granule_mtimes = []
cmr_search_after = None
while True:
req = urllib2.Request(cmr_query_url)
# add CMR search after header
if cmr_search_after:
req.add_header('CMR-Search-After', cmr_search_after)
logging.debug(f'CMR-Search-After: {cmr_search_after}')
response = opener.open(req)
# get search after index for next iteration
headers = {k.lower():v for k,v in dict(response.info()).items()}
cmr_search_after = headers.get('cmr-search-after')
# read the CMR search as JSON
search_page = json.loads(response.read().decode('utf8'))
ids,urls,mtimes = cmr_filter_json(search_page,
endpoint=endpoint, request_type=request_type)
if not urls or cmr_search_after is None:
break
# extend lists
granule_names.extend(ids)
granule_urls.extend(urls)
granule_mtimes.extend(mtimes)
# return the list of granule ids, urls and modification times
return (granule_names, granule_urls, granule_mtimes)
# PURPOSE: build requests for the GES DISC subsetting API
[docs]
def build_request(
short_name: str,
dataset_version: str | int,
url: str | None,
host: str | None = None,
variables: list | None = [],
format: str | None = 'bmM0Lw',
service: str | None = 'L34RS_MERRA2',
version: str | None = '1.02',
bbox: list | None = [-90,-180,90,180],
**kwargs
):
"""
Build requests for the GES DISC subsetting API
Parameters
----------
short_name: str
Model shortname in the CMR system
dataset_version: str
Model version
url: str
url for granule returned by the CMR system
host: str or NoneType, default None
Override host provider for GES DISC subsetting
Default is host provider given by CMR request
variables: list, default []
Variables for product to subset
format: str, default 'bmM0Lw'
Coded output format for GES DISC subsetting API
service: str, default 'L34RS_MERRA2'
GES DISC subsetting API service
version: str, default '1.02'
GES DISC subsetting API service version
bbox: list, default [-90,-180,90,180]
Bounding box to spatially subset
**kwargs: dict, default {}
Additional parameters for GES DISC subsetting API
Returns
-------
request_url: str
Formatted url for GES DISC subsetting API
"""
# split CMR supplied url for granule
HOST,*args = url_split(url)
host = HOST if (host is None) else host
api_host = posixpath.join(host,'daac-bin','OTF','HTTP_services.cgi?')
# create parameters to be encoded
kwargs['FILENAME'] = posixpath.join(posixpath.sep, *args)
kwargs['FORMAT'] = format
kwargs['SERVICE'] = service
kwargs['VERSION'] = version
kwargs['BBOX'] = ','.join(map(str, bbox))
kwargs['SHORTNAME'] = short_name
kwargs['DATASET_VERSION'] = dataset_version
kwargs['VARIABLES'] = ','.join(variables)
# return the formatted request url
request_url = api_host + urlencode(kwargs)
return request_url