Source code for dxpy.utils.exec_utils

# Copyright (C) 2013-2016 DNAnexus, Inc.
#
# This file is part of dx-toolkit (DNAnexus platform client libraries).
#
#   Licensed under the Apache License, Version 2.0 (the "License"); you may not
#   use this file except in compliance with the License. You may obtain a copy
#   of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.

"""
Utilities used in the DNAnexus execution environment and test harness.
"""

from __future__ import print_function, unicode_literals, division, absolute_import

import os, sys, json, re, collections, logging, argparse, string, itertools, subprocess, tempfile
from functools import wraps
from collections import namedtuple
import pipes

import dxpy
from ..compat import USING_PYTHON2, open, Mapping
from ..exceptions import AppInternalError

ENTRY_POINT_TABLE = {}

RUN_COUNT = 0

# Locale-independent version of string.printable
ASCII_PRINTABLE = string.ascii_letters + string.digits + string.punctuation + string.whitespace
def _safe_unicode(o):
    """
    Returns an equivalent unicode object, trying harder to avoid
    dependencies on the Python default encoding.
    """
    def clean(s):
        return u''.join([c if c in ASCII_PRINTABLE else '?' for c in s])
    if USING_PYTHON2:
        try:
            return unicode(o)
        except:
            try:
                s = str(o)
                try:
                    return s.decode("utf-8")
                except:
                    return clean(s[:2048]) + u" [Raw error message: " + unicode(s.encode("hex"), 'utf-8') + u"]"
            except:
                return u"(Unable to decode Python exception message)"
    else:
        return str(o)

def _format_exception_message(e):
    """
    Formats the specified exception.
    """
    # Prevent duplication of "AppError" in places that print "AppError"
    # and then this formatted string
    if isinstance(e, dxpy.AppError):
        return _safe_unicode(e)
    if USING_PYTHON2:
        return unicode(e.__class__.__name__, 'utf-8') + ": " + _safe_unicode(e)
    else:
        return e.__class__.__name__ + ": " + _safe_unicode(e)


[docs]def run(function_name=None, function_input=None): """Triggers the execution environment entry point processor. Use this function in the program entry point code: .. code-block:: python import dxpy @dxpy.entry_point('main') def hello(i): pass dxpy.run() This method may be used to invoke the program either in a production environment (inside the execution environment) or for local debugging (in the debug harness), as follows: If the environment variable *DX_JOB_ID* is set, the processor retrieves the job with that ID from the API server. The job's *function* field indicates the function name to be invoked. That function name is looked up in the table of all methods decorated with *@dxpy.entry_point('name')* in the module from which :func:`run()` was called, and the matching method is invoked (with the job's input supplied as parameters). This is the mode of operation used in the DNAnexus execution environment. .. warning:: The parameters *function_name* and *function_input* are disregarded in this mode of operation. If the environment variable *DX_JOB_ID* is not set, the function name may be given in *function_name*; if not set, it is set by the environment variable *DX_TEST_FUNCTION*. The function input may be given in *function_input*; if not set, it is set by the local file *job_input.json* which is expected to be present. The absence of *DX_JOB_ID* signals to :func:`run()` that execution is happening in the debug harness. In this mode of operation, all calls to :func:`dxpy.bindings.dxjob.new_dxjob()` (and higher level handler methods which use it) are intercepted, and :func:`run()` is invoked instead with appropriate inputs. """ global RUN_COUNT RUN_COUNT += 1 dx_working_dir = os.getcwd() if dxpy.JOB_ID is not None: logging.basicConfig() try: logging.getLogger().addHandler(dxpy.DXLogHandler()) except dxpy.exceptions.DXError: print("TODO: FIXME: the EE client should die if logging is not available") job = dxpy.describe(dxpy.JOB_ID) else: if function_name is None: function_name = os.environ.get('DX_TEST_FUNCTION', 'main') if function_input is None: with open("job_input.json", "r") as fh: function_input = json.load(fh) job = {'function': function_name, 'input': function_input} with open("job_error_reserved_space", "w") as fh: fh.write("This file contains reserved space for writing job errors in case the filesystem becomes full.\n" + " "*1024*64) print("Invoking", job.get('function'), "with", job.get('input')) try: result = ENTRY_POINT_TABLE[job['function']](**job['input']) except dxpy.AppError as e: save_error(e, dx_working_dir, error_type="AppError") raise except Exception as e: save_error(e, dx_working_dir) raise if result is not None: # TODO: protect against client removing its original working directory os.chdir(dx_working_dir) if USING_PYTHON2: # On python-2 we need to use binary mode with open("job_output.json", "wb") as fh: json.dump(result, fh, indent=2, cls=DXJSONEncoder) fh.write(b"\n") else: with open("job_output.json", "w") as fh: json.dump(result, fh, indent=2, cls=DXJSONEncoder) fh.write("\n") return result
def save_error(e, working_dir, error_type="AppInternalError"): if dxpy.JOB_ID is not None: os.chdir(working_dir) try: os.unlink("job_error_reserved_space") except: pass if USING_PYTHON2: # We need to use binary mode on python2 with open("job_error.json", "wb") as fh: json.dump({"error": {"type": error_type, "message": _format_exception_message(e)}}, fh) fh.write(b"\n") else: with open("job_error.json", "w") as fh: json.dump({"error": {"type": error_type, "message": _format_exception_message(e)}}, fh) fh.write("\n") # TODO: make this less naive with respect to cycles and any other things json.dumps() can handle def convert_handlers_to_dxlinks(x): if isinstance(x, dxpy.DXObject): x = dxpy.dxlink(x) elif isinstance(x, Mapping): for key, value in x.items(): x[key] = convert_handlers_to_dxlinks(value) elif isinstance(x, list): for i in range(len(x)): x[i] = convert_handlers_to_dxlinks(x[i]) return x def parse_args_as_job_input(args, app_spec): parser = argparse.ArgumentParser() json_inputs = set() for ispec in app_spec.get("inputSpec", []): kwargs = {} if ispec.get("type") == "int": kwargs["type"] = int elif ispec.get("type") == "float": kwargs["type"] = float elif ispec.get("type") == "boolean": kwargs["type"] = bool elif ispec.get("type") != "string": json_inputs.add(ispec["name"]) if ispec.get("optional") != None: kwargs["required"] = not ispec["optional"] parser.add_argument("--" + ispec["name"], **kwargs) inputs = {} for i, value in vars(parser.parse_args(args)).items(): if value is None: continue if i in json_inputs: try: inputs[i] = json.loads(value) except ValueError: from dxpy.utils.resolver import resolve_existing_path project, path, results = resolve_existing_path(value, ask_to_resolve=False, describe={'id': True}, allow_mult=False) print(project, path, results) if results is None or len(results) != 1: raise ValueError("Value {v} could not be resolved".format(v=value)) inputs[i] = dxpy.dxlink(results[0]['id'], project_id=project) else: inputs[i] = value return inputs
[docs]def entry_point(entry_point_name): """Use this to decorate a DNAnexus execution environment entry point. Example: .. code-block:: python @dxpy.entry_point('main') def hello(i): pass """ def wrap(f): ENTRY_POINT_TABLE[entry_point_name] = f @wraps(f) def wrapped_f(*args, **kwargs): return f(*args, **kwargs) return wrapped_f return wrap
[docs]class DXJSONEncoder(json.JSONEncoder): """ Like json.JSONEncoder, but converts DXObject objects into dxlinks. """
[docs] def default(self, obj): if isinstance(obj, dxpy.DXObject): return dxpy.dxlink(obj) # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj)
[docs]class DXExecDependencyError(AppInternalError): pass
[docs]class DXExecDependencyInstaller(object): """ Installs dependencies specified by the job. Dependencies are processed in the order specified in the bundledDepends, execDepends, and dependencies arrays of the runSpec hash (the former two are deprecated). Neighboring package dependencies of the same type are grouped. """ group_pms = ("apt", "gem", "cpan", "cran", "pip", "pip3") def __init__(self, executable_desc, job_desc, logger=None): """ :param executable_desc: The description of the executable of this job, which can be obtained in the response of the /executable-x/describe request. This dict must contain the following keys: - "runSpec" :type executable_desc: dict :param job_desc: The description of this job, which can be obtained in the response of the /job-x/describe request. If ``executable_desc["runSpec"]`` has key "bundledDependsByRegion", then this dict must contain the following keys: - "region" :type job_desc: dict """ if "runSpec" not in executable_desc: raise DXExecDependencyError('Expected field "runSpec" to be present in executable description"') if "bundledDependsByRegion" in executable_desc["runSpec"] and "region" not in job_desc: raise DXExecDependencyError("Expected key 'region' in job description") self.exec_desc = executable_desc self.run_spec = executable_desc["runSpec"] self.job_desc = job_desc self.stage = self.job_desc.get("function", "main") self.logger = logger self.dep_groups = [] for dep in itertools.chain(self._get_local_bundled_dependencies(), self.run_spec.get("execDepends", []), self.run_spec.get("dependencies", [])): self._validate_dependency(dep) dep_type = self._get_dependency_type(dep) # Ignore dx-toolkit and jq from execDepends. They do not play nice if dep["name"] in ("dx-toolkit", "jq", "python-argcomplete") and dep_type == "apt": continue if "stages" in dep and self.stage not in dep["stages"]: self.log("Skipping dependency {} because it is inactive in stage (function) {}".format(dep["name"], self.stage)) continue if len(self.dep_groups) == 0 or self.dep_groups[-1]["type"] != dep_type or dep_type not in self.group_pms: self.dep_groups.append({"type": dep_type, "deps": [], "index": len(self.dep_groups)}) self.dep_groups[-1]["deps"].append(dep) def _get_local_bundled_dependencies(self): bundled_depends_by_region = self.run_spec.get("bundledDependsByRegion") if bundled_depends_by_region is None: return self.run_spec.get("bundledDepends", []) else: # Resolve the local bundled dependencies for this job. job_region = self.job_desc["region"] return bundled_depends_by_region[job_region] def log(self, message): if self.logger: self.logger.info(message) else: print(message) def generate_shellcode(self, dep_group): base_apt_shellcode = "export DEBIAN_FRONTEND=noninteractive && apt-get install --yes --no-install-recommends {p}" dx_apt_update_shellcode = "apt-get update -o Dir::Etc::sourcelist=sources.list.d/nucleus.list -o Dir::Etc::sourceparts=- -o APT::Get::List-Cleanup=0" apt_err_msg = "APT failed, retrying with full update against official package repository" apt_shellcode_template = "({dx_upd} && {inst}) || (echo {e}; apt-get update && {inst})" apt_shellcode = apt_shellcode_template.format(dx_upd=dx_apt_update_shellcode, inst=base_apt_shellcode, e=apt_err_msg) def make_pm_atoms(packages, version_separator="="): package_atoms = (p["name"] + (version_separator+p["version"] if "version" in p else "") for p in packages) return " ".join(map(str, package_atoms)) dep_type, packages = dep_group["type"], dep_group["deps"] if dep_type == "apt": return apt_shellcode.format(p=make_pm_atoms(packages)) elif dep_type == "pip": return "pip install --upgrade " + make_pm_atoms(packages, version_separator="==") elif dep_type == "pip3": return "pip3 install --upgrade " + make_pm_atoms(packages, version_separator="==") elif dep_type == "gem": commands = [] for p in packages: commands.append("gem install " + p["name"]) if "version" in p: commands[-1] += " --version " + p["version"] return " && ".join(map(str, commands)) elif dep_type == "cpan": return "cpanm --notest " + make_pm_atoms(packages, version_separator="~") elif dep_type == "cran": repo = "http://cloud.r-project.org" r_preamble = "die <- function() { q(status=1) }; options(error=die); options(warn=2);" r_preamble += 'r <- getOption("repos"); r["CRAN"] = "{repo}"; options(repos=r)'.format(repo=repo) r_cmd_template = "R -e '{preamble}; {cmd}'" bootstrap_cmd = 'install.packages("devtools")' commands = [] for package in packages: if "version" in package: cmd = 'require(devtools); install_version("{name}", version="{version}")'.format(**package) else: cmd = 'install.packages("{}")'.format(package["name"]) commands.append(r_cmd_template.format(preamble=r_preamble, cmd=cmd)) if any("require(devtools)" in cmd for cmd in commands): commands.insert(0, r_cmd_template.format(preamble=r_preamble, cmd=bootstrap_cmd)) return " && ".join(commands) elif dep_type == "git": commands = ["apt-get install --yes git make", "export GIT_SSH=dx-git-ssh-helper"] for dep in packages: subcommands = [] build_dir = str(dep.get("destdir", "$(mktemp -d)")) subcommands.append("mkdir -p %s" % build_dir) subcommands.append("cd %s" % build_dir) subcommands.append("git clone " + str(dep["url"])) subdir = re.search("([^\/]+)$", str(dep["url"])).group(1) if subdir.endswith(".git"): subdir = subdir[:-len(".git")] subcommands.append("cd '%s'" % subdir) if "tag" in dep: subcommands.append("git checkout " + str(dep["tag"])) if "build_commands" in dep: subcommands.append(str(dep["build_commands"])) commands.append("(" + " && ".join(subcommands) + ")") return " && ".join(commands) else: raise DXExecDependencyError("Package manager type {pm} not supported".format(pm=dep_type)) def run(self, cmd, log_fh=None): subprocess.check_call(cmd, shell=True, stdout=log_fh, stderr=log_fh) def _install_dep_group(self, dep_group): self.log("Installing {} packages {}".format(dep_group["type"], ", ".join(dep["name"] for dep in dep_group["deps"]))) cmd = self.generate_shellcode(dep_group) log_filename = os.path.join(tempfile.gettempdir(), "dx_{type}_install_{index}.log".format(**dep_group)) try: with open(log_filename, "w") as fh: self.run(cmd, log_fh=fh) except subprocess.CalledProcessError as e: with open(log_filename) as fh: sys.stdout.write(fh.read()) raise DXExecDependencyError("Error while installing {type} packages {deps}".format(**dep_group)) def _install_dep_bundle(self, bundle): if bundle["id"].get("$dnanexus_link", "").startswith("file-"): self.log("Downloading bundled file {name}".format(**bundle)) try: dxpy.download_dxfile(bundle["id"], bundle["name"], project=dxpy.WORKSPACE_ID) except dxpy.exceptions.ResourceNotFound: dxpy.download_dxfile(bundle["id"], bundle["name"]) self.run("dx-unpack {}".format(pipes.quote(bundle["name"]))) else: self.log('Skipping bundled dependency "{name}" because it does not refer to a file'.format(**bundle)) def install(self): for dep_group in self.dep_groups: if dep_group["type"] == "bundle": self._install_dep_bundle(dep_group["deps"][0]) else: self._install_dep_group(dep_group) def _validate_dependency(self, dep): if "name" not in dep: raise DXExecDependencyError('Expected field "name" to be present in execution dependency "{}"'.format(dep)) elif dep.get("package_manager") == "git" and "url" not in dep: raise DXExecDependencyError('Execution dependency "{}" does not have a "url" field'.format(dep)) def _get_dependency_type(self, dep): if "id" in dep: return "bundle" else: return dep.get("package_manager", "apt")