Source code for astroquery.casda.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst

# 1. standard library imports
from io import BytesIO
import os
from urllib.parse import unquote, urlparse
import time
from xml.etree import ElementTree
from datetime import datetime, timezone

# 2. third party imports
import astropy.units as u
import astropy.coordinates as coord
from astropy.table import Table
from astropy.io.votable import parse
from astroquery import log

# 3. local imports - use relative imports
# commonly required local imports shown below as example
# all Query classes should inherit from BaseQuery.
from ..query import BaseQuery
# has common functions required by most modules
from ..utils import commons
# prepend_docstr is a way to copy docstrings between methods
from ..utils import prepend_docstr_nosections
# async_to_sync generates the relevant query tools from _async methods
from ..utils import async_to_sync
# import configurable items declared in __init__.py
from . import conf

# export all the public classes and methods
__all__ = ['Casda', 'CasdaClass']


[docs]@async_to_sync class CasdaClass(BaseQuery): """ Class for accessing ASKAP data through the CSIRO ASKAP Science Data Archive (CASDA). Typical usage: result = Casda.query_region('22h15m38.2s -45d50m30.5s', radius=0.5 * u.deg) """ # use the Configuration Items imported from __init__.py to set the URL, # TIMEOUT, etc. URL = conf.server TIMEOUT = conf.timeout POLL_INTERVAL = conf.poll_interval _soda_base_url = conf.soda_base_url _uws_ns = {'uws': 'http://www.ivoa.net/xml/UWS/v1.0'} def __init__(self, user=None, password=None): super(CasdaClass, self).__init__() if user is None: self._authenticated = False else: self._authenticated = True # self._user = user # self._password = password self._auth = (user, password)
[docs] def query_region_async(self, coordinates, radius=None, height=None, width=None, get_query_payload=False, cache=True): """ Queries a region around the specified coordinates. Either a radius or both a height and a width must be provided. Parameters ---------- coordinates : str or `astropy.coordinates`. coordinates around which to query radius : str or `astropy.units.Quantity`. the radius of the cone search width : str or `astropy.units.Quantity` the width for a box region height : str or `astropy.units.Quantity` the height for a box region get_query_payload : bool, optional Just return the dict of HTTP request parameters. cache: bool, optional Use the astroquery internal query result cache Returns ------- response : `requests.Response` The HTTP response returned from the service. All async methods should return the raw HTTP response. """ request_payload = self._args_to_payload(coordinates=coordinates, radius=radius, height=height, width=width) if get_query_payload: return request_payload response = self._request('GET', self.URL, params=request_payload, timeout=self.TIMEOUT, cache=cache) # result = self._parse_result(response) return response
# Create the dict of HTTP request parameters by parsing the user # entered values. def _args_to_payload(self, **kwargs): request_payload = dict() # Convert the coordinates to FK5 coordinates = kwargs.get('coordinates') fk5_coords = commons.parse_coordinates(coordinates).transform_to(coord.FK5) if kwargs['radius'] is not None: radius = u.Quantity(kwargs['radius']).to(u.deg) pos = 'CIRCLE {} {} {}'.format(fk5_coords.ra.degree, fk5_coords.dec.degree, radius.value) elif kwargs['width'] is not None and kwargs['height'] is not None: width = u.Quantity(kwargs['width']).to(u.deg).value height = u.Quantity(kwargs['height']).to(u.deg).value top = fk5_coords.dec.degree - (height/2) bottom = fk5_coords.dec.degree + (height/2) left = fk5_coords.ra.degree - (width/2) right = fk5_coords.ra.degree + (width/2) pos = 'RANGE {} {} {} {}'.format(left, right, top, bottom) else: raise ValueError("Either 'radius' or both 'height' and 'width' must be supplied.") request_payload['POS'] = pos return request_payload # the methods above implicitly call the private _parse_result method. # This should parse the raw HTTP response and return it as # an `astropy.table.Table`. def _parse_result(self, response, verbose=False): # if verbose is False then suppress any VOTable related warnings if not verbose: commons.suppress_vo_warnings() # try to parse the result into an astropy.Table, else # return the raw result with an informative error message. try: # do something with regex to get the result into # astropy.Table form. return the Table. data = BytesIO(response.content) table = Table.read(data) return table except ValueError as e: # catch common errors here, but never use bare excepts # return raw result/ handle in some way log.info("Failed to convert query result to table", e) return response
[docs] def filter_out_unreleased(self, table): """ Return a subset of the table which only includes released (public) data. Parameters ---------- table: `astropy.table.Table` A table of results as returned by query_region. Must include an obs_release_date column. Returns ------- table : `astropy.table.Table` The table with all unreleased (non public) data products filtered out. """ now = str(datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')) return table[(table['obs_release_date'] != '') & (table['obs_release_date'] < now)]
[docs] def stage_data(self, table, verbose=False): """ Request access to a set of data files. All requests for data must use authentication. If you have access to the data, the requested files will be brought online and a set of URLs to download the files will be returned. Parameters ---------- table: `astropy.table.Table` A table describing the files to be staged, such as produced by query_region. It must include an access_url column. verbose: bool, optional Should status message be logged periodically, defaults to False Returns ------- A list of urls of both the requested files and the checksums for the files """ if not self._authenticated: raise ValueError("Credentials must be supplied to download CASDA image data") if table is None or len(table) == 0: return [] # Use datalink to get authenticated access for each file tokens = [] soda_url = None for row in table: access_url = row['access_url'] if access_url: response = self._request('GET', access_url, auth=self._auth, timeout=self.TIMEOUT, cache=False) response.raise_for_status() service_url, id_token = self._parse_datalink_for_service_and_id(response, 'async_service') if id_token: tokens.append(id_token) soda_url = service_url # Trap a request with no allowed data if not soda_url: raise ValueError('You do not have access to any of the requested data files.') # Create job to stage all files job_url = self._create_soda_job(tokens, soda_url=soda_url) if verbose: log.info("Created data staging job " + job_url) # Wait for job to be complete final_status = self._run_job(job_url, verbose, poll_interval=self.POLL_INTERVAL) if final_status != 'COMPLETED': if verbose: log.info("Job ended with status " + final_status) raise ValueError('Data staging job did not complete successfully. Status was ' + final_status) # Build list of result file urls job_details = self._get_job_details_xml(job_url) fileurls = [] for result in job_details.find("uws:results", self._uws_ns).findall("uws:result", self._uws_ns): file_location = unquote(result.get("{http://www.w3.org/1999/xlink}href")) fileurls.append(file_location) return fileurls
[docs] def download_files(self, urls, savedir=''): """ Download a series of files Parameters ---------- urls: list of strings The list of URLs of the files to be downloaded. savedir: str, optional The directory in which to save the files. Returns ------- A list of the full filenames of the downloaded files. """ # for each url in list, download file and checksum filenames = [] for url in urls: parseResult = urlparse(url) local_filename = unquote(os.path.basename(parseResult.path)) if os.name == 'nt': # Windows doesn't allow special characters in filenames like # ":" so replace them with an underscore local_filename = local_filename.replace(':', '_') local_filepath = os.path.join(savedir or self.cache_location or '.', local_filename) self._download_file(url, local_filepath, timeout=self.TIMEOUT, cache=False) filenames.append(local_filepath) return filenames
def _parse_datalink_for_service_and_id(self, response, service_name): """ Parses a datalink file into a vo table, and returns the async service url and the authenticated id token. Parameters ---------- response: `requests.Response` The datalink query response. service_name: str The name of the service to be utilised. Returns ------- The url of the async service and the authenticated id token of the file. """ data = BytesIO(response.content) votable = parse(data, verify='warn') results = next(resource for resource in votable.resources if resource.type == "results") if results is None: return None results_array = results.tables[0].array async_url = None authenticated_id_token = None # Find the authenticated id token for accessing the image cube for result in results_array: service_def = result['service_def'] if isinstance(service_def, bytes): service_def = service_def.decode("utf8") if service_def == service_name: authenticated_id_token = result['authenticated_id_token'] if isinstance(service_def, bytes): authenticated_id_token = authenticated_id_token.decode("utf8") # Find the async url for resource in votable.resources: if resource.type == "meta": if resource.ID == service_name: for param in resource.params: if param.name == "accessURL": async_url = param.value if isinstance(async_url, bytes): async_url = async_url.decode() return async_url, authenticated_id_token def _create_soda_job(self, authenticated_id_tokens, soda_url=None): """ Creates the async job, returning the url to query the job status and details Parameters ---------- authenticated_id_tokens: list of str A list of tokens identifying the data products to be accessed. soda_url: str, optional The URL to be used to access the soda service. If not provided, the default CASDA one will be used. Returns ------- The url of the SODA job. """ id_params = list( map((lambda authenticated_id_token: ('ID', authenticated_id_token)), authenticated_id_tokens)) async_url = soda_url if soda_url else self._get_soda_url() resp = self._request('POST', async_url, params=id_params, cache=False) resp.raise_for_status() return resp.url def _run_job(self, job_location, verbose, poll_interval=20): """ Start an async job (e.g. TAP or SODA) and wait for it to be completed. Parameters ---------- job_location: str The url to query the job status and details verbose: bool Should progress be logged periodically poll_interval: int, optional The number of seconds to wait between checks on the status of the job. Returns ------- The single word final status of the job. Normally COMPLETED or ERROR """ # Start the async job if verbose: log.info("Starting the retrieval job...") self._request('POST', job_location + "/phase", data={'phase': 'RUN'}, cache=False) # Poll until the async job has finished prev_status = None count = 0 job_details = self._get_job_details_xml(job_location) status = self._read_job_status(job_details, verbose) while status == 'EXECUTING' or status == 'QUEUED' or status == 'PENDING': count += 1 if verbose and (status != prev_status or count > 10): log.info("Job is %s, polling every %d seconds." % (status, poll_interval)) count = 0 prev_status = status time.sleep(poll_interval) job_details = self._get_job_details_xml(job_location) status = self._read_job_status(job_details, verbose) return status def _get_soda_url(self): return self._soda_base_url + "data/async" def _get_job_details_xml(self, async_job_url): """ Get job details as XML Parameters ---------- async_job_url: str The url to query the job details Returns ------- `xml.etree.ElementTree` The job details object """ response = self._request('GET', async_job_url, cache=False) response.raise_for_status() job_response = response.text return ElementTree.fromstring(job_response) def _read_job_status(self, job_details_xml, verbose): """ Read job status from the job details XML Parameters ---------- job_details_xml: `xml.etree.ElementTree` The SODA job details verbose: bool Should additional information be logged for errors Returns ------- The single word status of the job. e.g. COMPLETED, EXECUTING, ERROR """ status_node = job_details_xml.find("{http://www.ivoa.net/xml/UWS/v1.0}phase") if status_node is None: if verbose: log.info("Unable to find status in status xml:") ElementTree.dump(job_details_xml) raise ValueError('Invalid job status xml received.') status = status_node.text return status
# the default tool for users to interact with is an instance of the Class Casda = CasdaClass()