Source code for sliderule.sliderule

# Copyright (c) 2021, University of Washington
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
#    this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# 3. Neither the name of the University of Washington nor the names of its
#    contributors may be used to endorse or promote products derived from this
#    software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF WASHINGTON AND CONTRIBUTORS
# “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF WASHINGTON OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import netrc
import requests
import socket
import json
import struct
import ctypes
import time
import logging
import warnings
import numpy
import geopandas
from shapely.geometry import Polygon
from datetime import datetime
from sliderule import version

###############################################################################
# GLOBALS
###############################################################################

# Coordinate Reference system definition for WGS84 Ellipsoid ensemble
# Coordinate system on the surface of a sphere or ellipsoid of reference.
EPSG_WGS84 = "EPSG:4326"

# This is 3D CRS for ITRF2014 realization with 2010.0 epoch: https://epsg.io/7912
# Note: using GRS80 ellipsoid, negligible difference in inverse flattening from WGS84
SLIDERULE_EPSG = "EPSG:7912"

PUBLIC_URL = "slideruleearth.io"
PUBLIC_ORG = "sliderule"

service_url = PUBLIC_URL
service_org = PUBLIC_ORG

session = requests.Session()
session.trust_env = False

ps_refresh_token = None
ps_access_token = None
ps_token_exp = None

MAX_PS_CLUSTER_WAIT_SECS = 600

request_timeout = (10, 120) # (connection, read) in seconds
decode_aux = True

logger = logging.getLogger(__name__)
console = None

clustering_enabled = False
try:
    from sklearn.cluster import KMeans
    clustering_enabled = True
except:
    logger.info("Unable to import sklearn... clustering support disabled")

recdef_table = {}

arrow_file_table = {}

profiles = {}

gps_epoch = datetime(1980, 1, 6)
tai_epoch = datetime(1970, 1, 1, 0, 0, 10)

eventformats = {
    "TEXT":     0,
    "JSON":     1
}

eventlogger = {
    0: logger.debug,
    1: logger.info,
    2: logger.warning,
    3: logger.error,
    4: logger.critical
}

datatypes = {
    "TEXT":     0,
    "REAL":     1,
    "INTEGER":  2,
    "DYNAMIC":  3
}

basictypes = {
    "INT8":     { "fmt": 'b', "size": 1, "nptype": numpy.int8   },
    "INT16":    { "fmt": 'h', "size": 2, "nptype": numpy.int16  },
    "INT32":    { "fmt": 'i', "size": 4, "nptype": numpy.int32  },
    "INT64":    { "fmt": 'q', "size": 8, "nptype": numpy.int64  },
    "UINT8":    { "fmt": 'B', "size": 1, "nptype": numpy.uint8  },
    "UINT16":   { "fmt": 'H', "size": 2, "nptype": numpy.uint16 },
    "UINT32":   { "fmt": 'I', "size": 4, "nptype": numpy.uint32 },
    "UINT64":   { "fmt": 'Q', "size": 8, "nptype": numpy.uint64 },
    "BITFIELD": { "fmt": 'x', "size": 0, "nptype": numpy.byte   },  # unsupported
    "FLOAT":    { "fmt": 'f', "size": 4, "nptype": numpy.single },
    "DOUBLE":   { "fmt": 'd', "size": 8, "nptype": numpy.double },
    "TIME8":    { "fmt": 'q', "size": 8, "nptype": numpy.int64  }, # numpy.datetime64
    "STRING":   { "fmt": 's', "size": 1, "nptype": numpy.byte   }
}

codedtype2str = {
    0:  "INT8",
    1:  "INT16",
    2:  "INT32",
    3:  "INT64",
    4:  "UINT8",
    5:  "UINT16",
    6:  "UINT32",
    7:  "UINT64",
    8:  "BITFIELD",
    9:  "FLOAT",
    10: "DOUBLE",
    11: "TIME8",
    12: "STRING"
}

###############################################################################
# CLIENT EXCEPTIONS
###############################################################################

class FatalError(RuntimeError):
    pass

class TransientError(RuntimeError):
    pass

###############################################################################
# UTILITIES
###############################################################################

#
# __StreamSource
#
class __StreamSource:
    def __init__(self, data):
        self.source = data
    def __iter__(self):
        for line in self.source.iter_content(None):
            yield line

#
# __BufferSource
#
class __BufferSource:
    def __init__(self, data):
        self.source = data
    def __iter__(self):
        yield self.source

#
#  __populate
#
def __populate(rectype):
    global recdef_table
    if rectype not in recdef_table:
        recdef_table[rectype] = source("definition", {"rectype" : rectype})
    return recdef_table[rectype]

#
#  __parse_json
#
def __parse_json(data):
    """
    data: request response
    """
    lines = []
    for line in data:
        lines.append(line)
    response = b''.join(lines)
    return json.loads(response)

#
#  __decode_native
#
def __decode_native(rectype, rawdata):
    """
    rectype: record type supplied in response (string)
    rawdata: payload supplied in response (byte array)
    """
    global recdef_table

    # initialize record
    rec = { "__rectype": rectype }

    # get/populate record definition #
    recdef = __populate(rectype)

    # iterate through each field in definition
    for fieldname in recdef.keys():

        # double underline (__) as prefix indicates meta data
        if fieldname.find("__") == 0:
            continue

        # get field properties
        field = recdef[fieldname]
        ftype = field["type"]
        offset = int(field["offset"] / 8)
        elems = field["elements"]
        flags = field["flags"]

        # do not process pointers
        if "PTR" in flags:
            continue

        # check for mvp flag
        if not decode_aux and "AUX" in flags:
            continue

        # get endianness
        if "LE" in flags:
            endian = '<'
        else:
            endian = '>'

        # decode basic type
        if ftype in basictypes:

            # check if array
            is_array = not (elems == 1)

            # get number of elements
            if elems <= 0:
                elems = int((len(rawdata) - offset) / basictypes[ftype]["size"])

            # build format string
            fmt = endian + str(elems) + basictypes[ftype]["fmt"]

            # parse data
            value = struct.unpack_from(fmt, rawdata, offset)

            # set field
            if ftype == "STRING":
                rec[fieldname] = ctypes.create_string_buffer(value[0]).value.decode('ascii')
            elif is_array:
                rec[fieldname] = value
            else:
                rec[fieldname] = value[0]

        # decode user type
        else:

            # populate record definition (if needed) #
            subrecdef = __populate(ftype)

            # check if array
            is_array = not (elems == 1)

            # get number of elements
            if elems <= 0:
                elems = int((len(rawdata) - offset) / subrecdef["__datasize"])

            # return parsed data
            if is_array:
                rec[fieldname] = []
                for e in range(elems):
                    rec[fieldname].append(__decode_native(ftype, rawdata[offset:]))
                    offset += subrecdef["__datasize"]
            else:
                rec[fieldname] = __decode_native(ftype, rawdata[offset:])

    # return record #
    return rec

#
#  __parse_native
#
def __parse_native(data, callbacks):
    """
    data: request response
    """
    recs = []

    rec_hdr_size = 8
    rec_size_index = 0
    rec_size_rsps = []

    rec_size = 0
    rec_index = 0
    rec_rsps = []

    duration = 0.0

    for line in data:

        # Capture Start Time (for duration)
        tstart = time.perf_counter()

        # Process Line Read
        i = 0
        while i < len(line):

            # Parse Record Size
            if(rec_size_index < rec_hdr_size):
                bytes_available = len(line) - i
                bytes_remaining = rec_hdr_size - rec_size_index
                bytes_to_append = min(bytes_available, bytes_remaining)
                rec_size_rsps.append(line[i:i+bytes_to_append])
                rec_size_index += bytes_to_append
                if(rec_size_index >= rec_hdr_size):
                    raw = b''.join(rec_size_rsps)
                    rec_version, rec_type_size, rec_data_size = struct.unpack('>hhi', raw)
                    if rec_version != 2:
                        raise FatalError("Invalid record format: %d" % (rec_version))
                    rec_size = rec_type_size + rec_data_size
                    rec_size_rsps.clear()
                i += bytes_to_append

            # Parse Record
            elif(rec_size > 0):
                bytes_available = len(line) - i
                bytes_remaining = rec_size - rec_index
                bytes_to_append = min(bytes_available, bytes_remaining)
                rec_rsps.append(line[i:i+bytes_to_append])
                rec_index += bytes_to_append
                if(rec_index >= rec_size):
                    # Decode Record
                    rawbits = b''.join(rec_rsps)
                    rectype = ctypes.create_string_buffer(rawbits).value.decode('ascii')
                    rawdata = rawbits[len(rectype) + 1:]
                    rec     = __decode_native(rectype, rawdata)
                    if rectype == "conrec":
                        # parse records contained in record
                        buffer = __BufferSource(rawdata[rec["start"]:])
                        contained_recs = __parse_native(buffer, callbacks)
                        recs += contained_recs
                    else:
                        if callbacks != None and rectype in callbacks:
                            # Execute Call-Back on Record
                            callbacks[rectype](rec)
                        else:
                            # Append Record
                            recs.append(rec)
                    # Reset Record Parsing
                    rec_rsps.clear()
                    rec_size_index = 0
                    rec_size = 0
                    rec_index = 0
                i += bytes_to_append

            # Zero Sized Record
            else:
                rec_size_index = 0
                rec_index = 0

        # Capture Duration
        duration = duration + (time.perf_counter() - tstart)

    # Update Timing Profile
    profiles[__parse_native.__name__] = duration

    return recs


###############################################################################
# Overriding DNS
###############################################################################

#
# local_dns - global dictionary of DNS entries to override
#
local_dns = {}

#
# __initdns - resets the global dictionary
#
def __initdns():
    local_dns.clear()

#
# __dnsoverridden - checks if url is already overridden
#
def __dnsoverridden():
    global service_org, service_url
    url = service_org + "." + service_url
    return url.lower() in local_dns

#
# __jamdns - override the dns entry
#
def __jamdns():
    global service_url, service_org, request_timeout
    url = service_org + "." + service_url
    headers = buildauthheader()
    host = "https://ps." + service_url + "/api/org_ip_adr/" + service_org + "/"
    rsps = session.get(host, headers=headers, timeout=request_timeout).json()
    if rsps["status"] == "SUCCESS":
        ipaddr = rsps["ip_address"]
        local_dns[url.lower()] = ipaddr
        logger.info("Overriding DNS for {} with {}".format(url, ipaddr))

#
# __override_getaddrinfo - replaces the socket library callback
#
socket_getaddrinfo = socket.getaddrinfo
def __override_getaddrinfo(*args):
    url = args[0].lower()
    if url in local_dns:
        logger.info("getaddrinfo returned {} for {}".format(local_dns[url], url))
        return socket_getaddrinfo(local_dns[url], *args[1:])
    else:
        return socket_getaddrinfo(*args)
socket.getaddrinfo = __override_getaddrinfo


###############################################################################
# Default Record Processing
###############################################################################

#
#  __logeventrec
#
def __logeventrec(rec):
    eventlogger[rec['level']]('%s' % (rec["attr"]))

#
#  __exceptrec
#
def __exceptrec(rec):
    if rec["code"] >= 0:
        eventlogger[rec["level"]]("Exception <%d>: %s", rec["code"], rec["text"])
    else:
        eventlogger[rec["level"]]("%s", rec["text"])

#
#  _arrowrec
#
def __arrowrec(rec):
    global arrow_file_table
    try :
        filename = rec["filename"]
        if rec["__rectype"] == 'arrowrec.meta':
            if filename in arrow_file_table:
                raise FatalError("file transfer already in progress")
            arrow_file_table[filename] = { "fp": open(filename, "wb"), "size": rec["size"], "progress": 0 }
        else: # rec["__rectype"] == 'arrowrec.data'
            data = rec['data']
            file = arrow_file_table[filename]
            file["fp"].write(bytearray(data))
            file["progress"] += len(data)
            if file["progress"] >= file["size"]:
                file["fp"].close()
                del arrow_file_table[filename]
    except Exception as e:
        raise FatalError("Failed to process arrow file: {}".format(e))

#
#  Globals
#
__callbacks = {'eventrec': __logeventrec, 'exceptrec': __exceptrec, 'arrowrec.meta': __arrowrec, 'arrowrec.data': __arrowrec }


###############################################################################
# INTERNAL APIs
###############################################################################

#
#  buildauthheader
#
def buildauthheader(force_refresh=False):
    """
    Build authentication header for use with provisioning system
    """
    global service_url, ps_access_token, ps_refresh_token, ps_token_exp
    headers = None
    if ps_access_token:
        # Check if Refresh Needed
        if time.time() > ps_token_exp or force_refresh:
            host = "https://ps." + service_url + "/api/org_token/refresh/"
            rqst = {"refresh": ps_refresh_token}
            hdrs = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + ps_access_token}
            rsps = session.post(host, data=json.dumps(rqst), headers=hdrs, timeout=request_timeout).json()
            ps_refresh_token = rsps["refresh"]
            ps_access_token = rsps["access"]
            ps_token_exp =  time.time() + (float(rsps["access_lifetime"]) / 2)
        # Build Authentication Header
        headers = {'Authorization': 'Bearer ' + ps_access_token}
    return headers

#
# GeoDataFrame to Polygon
#
def gdf2poly(gdf):

    # latch start time
    tstart = time.perf_counter()

    # pull out coordinates
    hull = gdf.unary_union.convex_hull
    polygon = [{"lon": coord[0], "lat": coord[1]} for coord in list(hull.exterior.coords)]

    # determine winding of polygon #
    #              (x2               -    x1)             *    (y2               +    y1)
    wind = sum([(polygon[i+1]["lon"] - polygon[i]["lon"]) * (polygon[i+1]["lat"] + polygon[i]["lat"]) for i in range(len(polygon) - 1)])
    if wind > 0:
        # reverse direction (make counter-clockwise) #
        ccw_poly = []
        for i in range(len(polygon), 0, -1):
            ccw_poly.append(polygon[i - 1])
        # replace region with counter-clockwise version #
        polygon = ccw_poly

    # Update Profile
    profiles[gdf2poly.__name__] = time.perf_counter() - tstart

    # return polygon
    return polygon

#
#  Create Empty GeoDataFrame
#
def emptyframe(**kwargs):
    # set default keyword arguments
    kwargs['crs'] = SLIDERULE_EPSG
    return geopandas.GeoDataFrame(geometry=geopandas.points_from_xy([], [], []), crs=kwargs['crs'])

#
# Process Output File
#
def procoutputfile(parm, rsps):
    output = parm["output"]
    path = output["path"]
    # Check If Remote Record Is In Responses
    for rsp in rsps:
        if 'arrowrec.remote' == rsp['__rectype']:
            path = rsp['url']
            break
    # Handle Local Files
    if "open_on_complete" in output and output["open_on_complete"]:
        if output["format"] == "parquet":
            if "as_geo" in output and not output["as_geo"]:
                # Return Parquet File as DataFrame
                return geopandas.pd.read_parquet(path)
            else:
                # Return GeoParquet File as GeoDataFrame
                return geopandas.read_parquet(path)
        elif output["format"] == "csv":
            # Return CSV File as DataFrame
            return geopandas.pd.read_csv(path)
        else:
            raise FatalError('unsupported output format: %s' % (output["format"]))
    else:
        # Return Parquet Filename
        return path

#
#  Get Values from Raw Buffer
#
def getvalues(data, dtype, size, num_elements=0):
    """
    data:   tuple of bytes
    dtype:  element of codedtype
    size:   bytes in data
    """

    raw = bytes(data)
    datatype = basictypes[codedtype2str[dtype]]["nptype"]
    if num_elements == 0: # dynamically determine number of elements
        num_elements = int(size / numpy.dtype(datatype).itemsize)
    slicesize = num_elements * numpy.dtype(datatype).itemsize # truncates partial bytes
    values = numpy.frombuffer(raw[:slicesize], dtype=datatype, count=num_elements)

    return values

#
#  Dictionary to GeoDataFrame
#
def todataframe(columns, time_key="time", lon_key="longitude", lat_key="latitude", height_key=None, **kwargs):

    # Latch Start Time
    tstart = time.perf_counter()

    # Set Default Keyword Arguments
    kwargs['index_key'] = "time"
    kwargs['crs'] = SLIDERULE_EPSG 

    # Check Empty Columns
    if len(columns) <= 0:
        return emptyframe(**kwargs)

    # Generate Time Column
    columns['time'] = columns[time_key].astype('datetime64[ns]')

    # Generate Geometry Column
    # 3D point geometry
    # This enables 3D CRS transformations using the to_crs() method
    if height_key == None:
        geometry = geopandas.points_from_xy(columns[lon_key], columns[lat_key])
    else:
        geometry = geopandas.points_from_xy(columns[lon_key], columns[lat_key], columns[height_key])
    del columns[lon_key]
    del columns[lat_key]

    # Create Pandas DataFrame object
    if type(columns) == dict:
        df = geopandas.pd.DataFrame(columns)
    else:
        df = columns

    # Build GeoDataFrame (default geometry is crs=SLIDERULE_EPSG)
    gdf = geopandas.GeoDataFrame(df, geometry=geometry, crs=kwargs['crs'])

    # Set index (default is Timestamp), can add `verify_integrity=True` to check for duplicates
    # Can do this during DataFrame creation, but this allows input argument for desired column
    gdf.set_index(kwargs['index_key'], inplace=True)

    # Sort values for reproducible output despite async processing
    gdf.sort_index(inplace=True)

    # Update Profile
    profiles[todataframe.__name__] = time.perf_counter() - tstart

    # Return GeoDataFrame
    return gdf

###############################################################################
# APIs
###############################################################################

#
#  Initialize
#
[docs] def init (url=PUBLIC_URL, verbose=False, loglevel=logging.INFO, organization=0, desired_nodes=None, time_to_live=60, bypass_dns=False, plugins=[]): ''' Initializes the Python client for use with SlideRule, and should be called before other ICESat-2 API calls. This function is a wrapper for a handful of sliderule functions that would otherwise all have to be called in order to initialize the client. Parameters ---------- url: str the IP address or hostname of the SlideRule service (slidereearth.io by default) verbose: bool sets up console logger as a convenience to user so all logs are printed to screen loglevel: int minimum severity of log message to output organization: str SlideRule provisioning system organization the user belongs to (see sliderule.authenticate for details) desired_nodes: int requested number of processing nodes in the cluster time_to_live: int minimum number of minutes the desired number of nodes should be present in the cluster bypass_dns: bool if true then the ip address for the cluster is retrieved from the provisioning system and used directly plugins: list names of the plugins that need to be available on the server Returns ------- bool Status of version check Examples -------- >>> import sliderule >>> sliderule.init() ''' # massage function parameters if organization == 0: organization = PUBLIC_ORG # configure client set_verbose(verbose, loglevel) set_url(url) # configure domain authenticate(organization) # configure credentials (if any) for organization scaleout(desired_nodes, time_to_live, bypass_dns) # set cluster to desired number of nodes (if permitted based on credentials) return check_version(plugins=plugins) # verify compatibility between client and server versions
# # source #
[docs] def source (api, parm={}, stream=False, callbacks={}, path="/source", silence=False): ''' Perform API call to SlideRule service Parameters ---------- api: str name of the SlideRule endpoint parm: dict dictionary of request parameters stream: bool whether the request is a **normal** service or a **stream** service (see `De-serialization </web/rtd/user_guide/SlideRule.html#de-serialization>`_ for more details) callbacks: dict record type callbacks (advanced use) path: str path to api being requested silence: bool whether error log messages should be generated Returns ------- dictionary response data Examples -------- >>> import sliderule >>> sliderule.set_url("slideruleearth.io") >>> rqst = { ... "time": "NOW", ... "input": "NOW", ... "output": "GPS" ... } >>> rsps = sliderule.source("time", rqst) >>> print(rsps) {'time': 1300556199523.0, 'format': 'GPS'} ''' global service_url, service_org rqst = json.dumps(parm) rsps = {} headers = None # Build Callbacks for c in __callbacks: if c not in callbacks: callbacks[c] = __callbacks[c] # Attempt Request complete = False attempts = 3 while not complete and attempts > 0: attempts -= 1 try: # Construct Request URL and Authorization if service_org: url = 'https://%s.%s%s/%s' % (service_org, service_url, path, api) headers = buildauthheader() else: url = 'http://%s%s/%s' % (service_url, path, api) # Perform Request if not stream: data = session.get(url, data=rqst, headers=headers, timeout=request_timeout) else: data = session.post(url, data=rqst, headers=headers, timeout=request_timeout, stream=True) data.raise_for_status() # Parse Response stream = __StreamSource(data) format = data.headers['Content-Type'] if format == 'text/plain': rsps = __parse_json(stream) elif format == 'application/json': rsps = __parse_json(stream) elif format == 'application/octet-stream': rsps = __parse_native(stream, callbacks) else: raise FatalError('unsupported content type: %s' % (format)) # Success complete = True except requests.exceptions.SSLError as e: logger.debug("Exception in request to {}: {}".format(url, e)) if not silence: logger.error("Unable to verify SSL certificate for {} ...retrying request".format(url)) except requests.ConnectionError as e: logger.debug("Exception in request to {}: {}".format(url, e)) if not silence: logger.error("Connection error to endpoint {} ...retrying request".format(url)) except requests.Timeout as e: logger.debug("Exception in request to {}: {}".format(url, e)) if not silence: logger.error("Timed-out waiting for response from endpoint {} ...retrying request".format(url)) except requests.exceptions.ChunkedEncodingError as e: logger.debug("Exception in request to {}: {}".format(url, e)) if not silence: logger.error("Unexpected termination of response from endpoint {} ...retrying request".format(url)) except requests.HTTPError as e: logger.debug("Exception in request to {}: {}".format(url, e)) if e.response.status_code == 503: raise TransientError("Server experiencing heavy load, stalling on request to {}".format(url)) else: raise FatalError("HTTP error {} from endpoint {}".format(e.response.status_code, url)) except: raise # Check Success if not complete: raise FatalError("Unable to complete request due to errors") # Return Response return rsps
# # set_url #
[docs] def set_url (url): ''' Configure sliderule package with URL of service Parameters ---------- urls: str IP address or hostname of SlideRule service (note, there is a special case where the url is provided as a list of strings instead of just a string; when a list is provided, the client hardcodes the set of servers that are used to process requests to the exact set provided; this is used for testing and for local installations and can be ignored by most users) Examples -------- >>> import sliderule >>> sliderule.set_url("service.my-sliderule-server.org") ''' global service_url service_url = url
# # set_verbose #
[docs] def set_verbose (enable, loglevel=logging.INFO): ''' Sets up a console logger to print log messages to screen If you want more control over the behavior of the log messages being captured, do not call this function but instead create and configure a Python log handler of your own and attach it to `sliderule.logger`. Parameters ---------- enable: bool True: creates console logger if it doesn't exist, False: destroys console logger if it does exist loglevel: int minimum severity of log message to output Examples -------- >>> import sliderule >>> sliderule.set_verbose(True, loglevel=logging.INFO) ''' global console, logger # massage loglevel parameter if passed in as a string if loglevel == "DEBUG": loglevel = logging.DEBUG elif loglevel == "INFO": loglevel = logging.INFO elif loglevel == "WARNING": loglevel = logging.WARNING elif loglevel == "WARN": loglevel = logging.WARN elif loglevel == "ERROR": loglevel = logging.ERROR elif loglevel == "FATAL": loglevel = logging.FATAL elif loglevel == "CRITICAL": loglevel = logging.CRITICAL # enable/disable logging to console if (enable == True) and (console == None): console = logging.StreamHandler() logger.addHandler(console) elif (enable == False) and (console != None): logger.removeHandler(console) console = None # always set level to requested logger.setLevel(loglevel) if console != None: console.setLevel(loglevel)
# # set_rqst_timeout #
[docs] def set_rqst_timeout (timeout): ''' Sets the TCP/IP connection and reading timeouts for future requests made to sliderule servers. Setting it lower means the client will failover more quickly, but may generate false positives if a processing request stalls or takes a long time returning data. Setting it higher means the client will wait longer before designating it a failed request which in the presence of a persistent failure means it will take longer for the client to remove the node from its available servers list. Parameters ---------- timeout: tuple (<connection timeout in seconds>, <read timeout in seconds>) Examples -------- >>> import sliderule >>> sliderule.set_rqst_timeout((10, 60)) ''' global request_timeout if type(timeout) == tuple: request_timeout = timeout else: raise FatalError('timeout must be a tuple (<connection timeout>, <read timeout>)')
# # set_processing_flags # def set_processing_flags (aux=True): ''' Sets flags used when processing the record definitions Parameters ---------- aux: bool decode auxiliary fields Examples -------- >>> import sliderule >>> sliderule.set_processing_flags(aux=False) ''' global decode_aux if type(aux) == bool: decode_aux = aux else: raise FatalError('aux must be a boolean') # # update_available_servers #
[docs] def update_available_servers (desired_nodes=None, time_to_live=None): ''' Manages the number of servers in the cluster. If the desired_nodes parameter is set, then a request is made to change the number of servers in the cluster to the number specified. In all cases, the number of nodes currently running in the cluster are returned - even if desired_nodes is set; subsequent calls to this function is needed to check when the current number of nodes matches the desired_nodes. Parameters ---------- desired_nodes: int the desired number of nodes in the cluster time_to_live: int number of minutes for the desired nodes to run Returns ------- int number of nodes currently in the cluster int number of nodes available for work in the cluster Examples -------- >>> import sliderule >>> num_servers, max_workers = sliderule.update_available_servers(10) ''' global service_url, service_org, request_timeout requested_nodes = 0 # Update number of nodes if type(desired_nodes) == int: rsps_body = {} requested_nodes = desired_nodes headers = buildauthheader() # Get boundaries of cluster and calculate nodes to request try: host = "https://ps." + service_url + "/api/org_num_nodes/" + service_org + "/" rsps = session.get(host, headers=headers, timeout=request_timeout) rsps_body = rsps.json() rsps.raise_for_status() requested_nodes = max(min(desired_nodes, rsps_body["max_nodes"]), rsps_body["min_nodes"]) if requested_nodes != desired_nodes: logger.info("Provisioning system desired nodes overridden to {}".format(requested_nodes)) except requests.exceptions.HTTPError as e: logger.info('{}'.format(e)) logger.info('Provisioning system status request returned error => {}'.format(rsps_body["error_msg"])) # Request number of nodes in cluster try: if type(time_to_live) == int: host = "https://ps." + service_url + "/api/desired_org_num_nodes_ttl/" + service_org + "/" + str(requested_nodes) + "/" + str(time_to_live) + "/" rsps = session.post(host, headers=headers, timeout=request_timeout) else: host = "https://ps." + service_url + "/api/desired_org_num_nodes/" + service_org + "/" + str(requested_nodes) + "/" rsps = session.put(host, headers=headers, timeout=request_timeout) rsps_body = rsps.json() rsps.raise_for_status() except requests.exceptions.HTTPError as e: logger.info('{}'.format(e)) logger.error('Provisioning system update request error => {}'.format(rsps_body["error_msg"])) # Get number of nodes currently registered try: rsps = source("status", parm={"service":"sliderule"}, path="/discovery", silence=True) available_servers = rsps["nodes"] except FatalError as e: logger.debug("Failed to retrieve number of nodes registered: {}".format(e)) available_servers = 0 return available_servers, requested_nodes
# # scaleout #
[docs] def scaleout(desired_nodes, time_to_live, bypass_dns): ''' Scale the cluster and wait for cluster to reach desired state Parameters ---------- desired_nodes: int the desired number of nodes in the cluster time_to_live: int number of minutes for the desired nodes to run bypass_dns: bool the cluster ip address is retrieved from the provisioning system and used directly Examples -------- >>> import sliderule >>> sliderule.scaleout(4, 300, False) ''' if desired_nodes is None: return # nothing needs to be done if desired_nodes < 0: raise FatalError("Number of desired nodes must be greater than zero ({})".format(desired_nodes)) # Initialize DNS __initdns() # clear cache of DNS lookups for clusters if bypass_dns: __jamdns() # use ip address for cluster # Send Initial Request for Desired Cluster State start = time.time() available_nodes,requested_nodes = update_available_servers(desired_nodes=desired_nodes, time_to_live=time_to_live) scale_up_needed = False # Wait for Cluster to Reach Desired State while available_nodes < requested_nodes: scale_up_needed = True logger.info("Waiting while cluster scales to desired capacity (currently at {} nodes, desired is {} nodes)... {} seconds".format(available_nodes, desired_nodes, int(time.time() - start))) time.sleep(10.0) available_nodes,_ = update_available_servers() # Override DNS if Cluster is Starting if available_nodes == 0 and not __dnsoverridden(): __jamdns() # Timeout Occurred if int(time.time() - start) > MAX_PS_CLUSTER_WAIT_SECS: logger.error("Maximum time allowed waiting for cluster has been exceeded") break # Log Final Message if Cluster Needed State Change if scale_up_needed: logger.info("Cluster has reached capacity of {} nodes... {} seconds".format(available_nodes, int(time.time() - start)))
# # authenticate #
[docs] def authenticate (ps_organization, ps_username=None, ps_password=None, github_token=None): ''' Authenticate to SlideRule Provisioning System The username and password can be provided the following way in order of priority: (1) The passed in arguments `github_token` or `ps_username` and `ps_password`; (2) The O.S. environment variables `PS_GITHUB_TOKEN` or `PS_USERNAME` and `PS_PASSWORD`; (3) The `ps.<url>` entry in the .netrc file in your home directory Parameters ---------- ps_organization: str name of the SlideRule organization the user belongs to ps_username: str SlideRule provisioning system account name ps_password: str SlideRule provisioning system account password github_token: str GitHub access token (minimum scope/permissions require) Returns ------- status True of successful, False if unsuccessful Examples -------- >>> import sliderule >>> sliderule.authenticate("myorg") True ''' global service_org, ps_refresh_token, ps_access_token, ps_token_exp login_status = False ps_url = "ps." + service_url # set organization on any authentication request service_org = ps_organization # check for direct or public access if service_org == None: return True # attempt retrieving from environment if not github_token and not ps_username and not ps_password: github_token = os.environ.get("PS_GITHUB_TOKEN") ps_username = os.environ.get("PS_USERNAME") ps_password = os.environ.get("PS_PASSWORD") # attempt retrieving from netrc file if not github_token and not ps_username and not ps_password: try: netrc_file = netrc.netrc() login_credentials = netrc_file.hosts[ps_url] ps_username = login_credentials[0] ps_password = login_credentials[2] except Exception as e: if ps_organization != PUBLIC_ORG: logger.warning("Unable to retrieve username and password from netrc file for machine: {}".format(e)) # build authentication request user = None if github_token: user = "github" rqst = {"org_name": ps_organization, "access_token": github_token} headers = {'Content-Type': 'application/json'} api = "https://" + ps_url + "/api/org_token_github/" elif ps_username or ps_password: user = "local" rqst = {"username": ps_username, "password": ps_password, "org_name": ps_organization} headers = {'Content-Type': 'application/json'} api = "https://" + ps_url + "/api/org_token/" # authenticate to provisioning system if user: try: rsps = session.post(api, data=json.dumps(rqst), headers=headers, timeout=request_timeout) rsps.raise_for_status() rsps = rsps.json() ps_refresh_token = rsps["refresh"] ps_access_token = rsps["access"] ps_token_exp = time.time() + (float(rsps["access_lifetime"]) / 2) login_status = True except: if ps_organization != PUBLIC_ORG: logger.error("Unable to authenticate %s user to %s" % (user, api)) # return login status return login_status
# # gps2utc #
[docs] def gps2utc (gps_time, as_str=True): ''' Convert a GPS based time returned from SlideRule into a UTC time. Parameters ---------- gps_time: float number of seconds since GPS epoch (January 6, 1980) as_str: bool if True, returns the time as a string; if False, returns the time as datatime object Returns ------- datetime UTC time (i.e. GMT, or Zulu time) Examples -------- >>> import sliderule >>> sliderule.gps2utc(1235331234) '2019-02-27 19:34:03' ''' rsps = source("time", {"time": int(gps_time * 1000), "input": "GPS", "output": "DATE"}) if as_str: return rsps["time"] else: return datetime.strptime(rsps["time"], '%Y-%m-%dT%H:%M:%SZ')
# # get_definition #
[docs] def get_definition (rectype, fieldname): ''' Get the underlying format specification of a field in a return record. Parameters ---------- rectype: str the name of the type of the record (i.e. "atl03rec") fieldname: str the name of the record field (i.e. "cycle") Returns ------- dict description of each field; see the `sliderule.basictypes` variable for different field types Examples -------- >>> import sliderule >>> sliderule.set_url("slideruleearth.io") >>> sliderule.get_definition("atl03rec", "cycle") {'fmt': 'H', 'size': 2, 'nptype': <class 'numpy.uint16'>} ''' recdef = __populate(rectype) if fieldname in recdef and recdef[fieldname]["type"] in basictypes: return basictypes[recdef[fieldname]["type"]] else: return {}
# # get_version #
[docs] def get_version (): ''' Get the version information for the running servers and Python client Returns ------- dict dictionary of version information ''' global service_org rsps = source("version", {}) rsps["client"] = {"version": version.full_version} rsps["organization"] = service_org return rsps
# # check_version #
[docs] def check_version (plugins=[]): ''' Check that the version of the client matches the version of the server and any additionally requested plugins Parameters ---------- plugins: list list of package names (as strings) to check the version on Returns ------- bool True if at least minor version matches; False if major or minor version doesn't match ''' status = True info = get_version() # populate version info versions = {} for entity in ['server', 'client'] + plugins: s = info[entity]['version'][1:].split('.') versions[entity] = (int(s[0]), int(s[1]), int(s[2])) # check major version mismatches if versions['server'][0] != versions['client'][0]: raise RuntimeError("Client (version {}) is incompatible with the server (version {})".format(versions['client'], versions['server'])) else: for pkg in plugins: if versions[pkg][0] != versions['client'][0]: raise RuntimeError("Client (version {}) is incompatible with the {} plugin (version {})".format(versions['client'], pkg, versions[pkg])) # check minor version mismatches if versions['server'][1] > versions['client'][1]: logger.warning("Client (version {}) is out of date with the server (version {})".format(versions['client'], versions['server'])) status = False else: for pkg in plugins: if versions[pkg][1] > versions['client'][1]: logger.warning("Client (version {}) is out of date with the {} plugin (version {})".format(versions['client'], pkg, versions['server'])) status = False # return if version check is successful return status
# # Format Region Specification #
[docs] def toregion(source, tolerance=0.0, cellsize=0.01, n_clusters=1): ''' Convert a GeoJSON/Shapefile/GeoDataFrame/list representation of a set of geospatial regions into a list of lat,lon coordinates and raster image recognized by SlideRule Parameters ---------- source: str file name of GeoJSON formatted regions of interest, file **must** have name with the .geojson suffix file name of ESRI Shapefile formatted regions of interest, file **must** have name with the .shp suffix GeoDataFrame of region of interest list of longitude,latitude pairs forming a polygon (e.g. [lat1, lon1, lat2, lon2, lat3, lon3, lat1, lon1]) list of longitude,latitude pairs forming a bounding box (e.g. [lat1, lon1, lat2, lon2]) tolerance: float tolerance used to simplify complex shapes so that the number of points is less than the limit (a tolerance of 0.001 typically works for most complex shapes) cellsize: float size of pixel in degrees used to create the raster image of the polygon n_clusters: int number of clusters of polygons to create when breaking up the request to CMR Returns ------- dict a list of longitudes and latitudes containing the region of interest that can be used for the **poly** and **raster** parameters in a processing request to SlideRule. region = { "gdf": <GeoDataFrame of region> "poly": [{"lat": <lat1>, "lon": <lon1> }, ...], "raster": {"data": <geojson file as string>, "clusters": [[{"lat": <lat1>, "lon": <lon1>}, ...], [{"lat": <lat1>, "lon": <lon1>}, ...]] } Examples -------- >>> import sliderule, json >>> region = sliderule.toregion("tests/data/grandmesa.geojson") >>> print(json.dumps(region["poly"], indent=4)) [ { "lon": -108.20772968780051, "lat": 38.8232055291981 }, { "lon": -108.07460164311031, "lat": 38.8475137825863 }, { "lon": -107.72839858755752, "lat": 39.01510930230633 }, { "lon": -107.78724142490994, "lat": 39.195630349659986 }, { "lon": -108.17287000970857, "lat": 39.15920066396116 }, { "lon": -108.31168256553767, "lat": 39.13757646212944 }, { "lon": -108.34115668325224, "lat": 39.03758987613325 }, { "lon": -108.2878686387796, "lat": 38.89051431295789 }, { "lon": -108.20772968780051, "lat": 38.8232055291981 } ] ''' tstart = time.perf_counter() tempfile = "temp.geojson" if isinstance(source, geopandas.GeoDataFrame): # user provided GeoDataFrame instead of a file gdf = source # Convert to geojson file gdf.to_file(tempfile, driver="GeoJSON") with open(tempfile, mode='rt') as file: datafile = file.read() os.remove(tempfile) elif isinstance(source, Polygon): gdf = geopandas.GeoDataFrame(geometry=[source], crs=EPSG_WGS84) gdf.to_file(tempfile, driver="GeoJSON") with open(tempfile, mode='rt') as file: datafile = file.read() os.remove(tempfile) elif isinstance(source, list) and (len(source) >= 4) and (len(source) % 2 == 0): # create lat/lon lists if len(source) == 4: # bounding box lons = [source[0], source[2], source[2], source[0], source[0]] lats = [source[1], source[1], source[3], source[3], source[1]] elif len(source) > 4: # polygon list lons = [source[i] for i in range(1,len(source),2)] lats = [source[i] for i in range(0,len(source),2)] # create geodataframe p = Polygon([point for point in zip(lons, lats)]) gdf = geopandas.GeoDataFrame(geometry=[p], crs=EPSG_WGS84) # Convert to geojson file gdf.to_file(tempfile, driver="GeoJSON") with open(tempfile, mode='rt') as file: datafile = file.read() os.remove(tempfile) elif isinstance(source, str) and (source.find(".shp") > 1): # create geodataframe gdf = geopandas.read_file(source) # Convert to geojson file gdf.to_file(tempfile, driver="GeoJSON") with open(tempfile, mode='rt') as file: datafile = file.read() os.remove(tempfile) elif isinstance(source, str) and (source.find(".geojson") > 1): # create geodataframe gdf = geopandas.read_file(source) with open(source, mode='rt') as file: datafile = file.read() else: raise FatalError("incorrect filetype: please use a .geojson, .shp, or a geodataframe") # If user provided raster we don't have gdf, geopandas cannot easily convert it polygon = clusters = None if gdf is not None: # simplify polygon if(tolerance > 0.0): with warnings.catch_warnings(): warnings.simplefilter("ignore") gdf = gdf.buffer(tolerance) gdf = gdf.simplify(tolerance) # generate polygon polygon = gdf2poly(gdf) # generate clusters clusters = [] if n_clusters > 1: if clustering_enabled: # pull out centroids of each geometry object if "CenLon" in gdf and "CenLat" in gdf: X = numpy.column_stack((gdf["CenLon"], gdf["CenLat"])) else: s = gdf.centroid X = numpy.column_stack((s.x, s.y)) # run k means clustering algorithm against polygons in gdf kmeans = KMeans(n_clusters=n_clusters, init='k-means++', random_state=5, max_iter=400) y_kmeans = kmeans.fit_predict(X) k = geopandas.pd.DataFrame(y_kmeans, columns=['cluster']) gdf = gdf.join(k) # build polygon for each cluster for n in range(n_clusters): c_gdf = gdf[gdf["cluster"] == n] c_poly = gdf2poly(c_gdf) clusters.append(c_poly) else: raise FatalError("Clustering support not enabled; unable to import sklearn package") # update timing profiles profiles[toregion.__name__] = time.perf_counter() - tstart # return region return { "gdf": gdf, "poly": polygon, # convex hull of polygons "clusters": clusters, # list of polygon clusters for cmr request "raster": { "data": datafile, # geojson file "length": len(datafile), # geojson file length "cellsize": cellsize # units are in crs/projection } }