Source code for ephemeris.setup_data_libraries

#!/usr/bin/env python
"""Tool to setup data libraries on a galaxy instance"""
import argparse
import logging as log
import sys
import time

import yaml
from bioblend import galaxy

from .common_parser import (
    DEFAULT_JOB_SLEEP,
    get_common_args,
    HideUnderscoresHelpFormatter,
)


[docs] def create_legacy(gi, desc): destination = desc["destination"] if destination["type"] != "library": raise Exception("Only libraries may be created with pre-18.05 Galaxies using this script.") library_name = destination.get("name") library_description = destination.get("description") library_synopsis = destination.get("synopsis") # Check to see if the library already exists. If it does, do not recreate it. If it doesn't, create it. lib_id = None print("Library name: " + str(library_name)) rmt_lib_list = gi.libraries.get_libraries(name=library_name, deleted=False) # Now we need to check if the library has been deleted since deleted=False still returns the deleted libraries! not_deleted_rmt_lib_list = [] folder_id = None if rmt_lib_list: for x in rmt_lib_list: if not x["deleted"]: not_deleted_rmt_lib_list.append(x) if not_deleted_rmt_lib_list: lib_id = not_deleted_rmt_lib_list[0]["id"] print("Library already exists! id: " + str(lib_id)) folder_id = gi.libraries.show_library(lib_id)["root_folder_id"] else: lib = gi.libraries.create_library(library_name, library_description, library_synopsis) lib_id = lib["id"] folder_id = lib["root_folder_id"] def populate_items(base_folder_id, has_items): if "items" in has_items: name = has_items.get("name") description = has_items.get("description") folder_id = base_folder_id if name: # Check to see if the folder already exists, if it doesn't create it. rmt_folder_list = [] folder = gi.libraries.get_folders(lib_id) new_folder_name = "/" + name if folder and not folder[0]["name"] == "/": new_folder_name = folder[0]["name"] + "/" + name rmt_folder_list = gi.libraries.get_folders(lib_id, name=new_folder_name) if rmt_folder_list: folder_id = rmt_folder_list[0]["id"] else: folder = gi.libraries.create_folder(lib_id, name, description, base_folder_id=base_folder_id) folder_id = folder[0]["id"] for item in has_items["items"]: populate_items(folder_id, item) else: src = has_items["src"] if src != "url": raise Exception("For pre-18.05 Galaxies only support URLs src items are supported.") rmt_library_files = gi.folders.show_folder(base_folder_id, contents=True)["folder_contents"] file_names = [] for item in rmt_library_files: if item["type"] == "file": file_names.append(item["name"]) if has_items["url"] not in file_names: try: gi.libraries.upload_file_from_url( lib_id, has_items["url"], folder_id=base_folder_id, file_type=has_items["ext"], ) except Exception: log.exception( "Could not upload %s to %s/%s", has_items["url"], lib_id, base_folder_id, ) return None populate_items(folder_id, desc) return []
[docs] def create_batch_api(gi, desc): hc = galaxy.histories.HistoryClient(gi) tc = galaxy.tools.ToolClient(gi) history = hc.create_history() url = "%s/tools/fetch" % gi.url payload = {"targets": [desc], "history_id": history["id"]} yield tc._post(payload=payload, url=url)
[docs] def setup_data_libraries(gi, data, training=False, legacy=False): """ Load files into a Galaxy data library. By default all test-data tools from all installed tools will be linked into a data library. """ log.info("Importing data libraries.") jc = galaxy.jobs.JobsClient(gi) config = galaxy.config.ConfigClient(gi) version = config.get_version() if legacy: create_func = create_legacy else: version_major = version.get("version_major", "16.01") create_func = create_batch_api if version_major >= "18.05" else create_legacy library_def = yaml.safe_load(data) def normalize_items(has_items): # Synchronize Galaxy batch format with older training material style. if "files" in has_items: items = has_items.pop("files") has_items["items"] = items items = has_items.get("items", []) for item in items: normalize_items(item) src = item.get("src") url = item.get("url") if src is None and url: item["src"] = "url" if "file_type" in item: ext = item.pop("file_type") item["ext"] = ext # Normalize library definitions to allow older ephemeris style and native Galaxy batch # upload formats. if "libraries" in library_def: # File contains multiple definitions. library_def["items"] = library_def.pop("libraries") if "destination" not in library_def: library_def["destination"] = {"type": "library"} destination = library_def["destination"] if training: destination["name"] = destination.get("name", "Training Data") destination["description"] = destination.get("description", "Data pulled from online archives.") else: destination["name"] = destination.get("name", "New Data Library") destination["description"] = destination.get("description", "") normalize_items(library_def) if library_def: jobs = list(create_func(gi, library_def)) job_ids = [] if legacy: for job in jc.get_jobs(): # Fetch all upload job IDs, ignoring complete ones. if job["tool_id"] == "upload1" and job["state"] not in ("ok", "error"): job_ids.append(job["id"]) # Just have to check that all upload1 jobs are termianl. else: # Otherwise get back an actual list of jobs for job in jobs: if "jobs" in job: for subjob in job["jobs"]: job_ids.append(subjob["id"]) while True: job_states = [jc.get_state(job) in ("ok", "error", "deleted") for job in job_ids] log.debug( "Job states: %s" % ",".join([f"{job_id}={job_state}" for (job_id, job_state) in zip(job_ids, job_states)]) ) if all(job_states): break time.sleep(DEFAULT_JOB_SLEEP) log.info("Finished importing test data.")
def _parser(): """Constructs the parser object""" parent = get_common_args() parser = argparse.ArgumentParser( parents=[parent], formatter_class=HideUnderscoresHelpFormatter, description="Populate the Galaxy data library with data.", ) parser.add_argument("-i", "--infile", required=True, type=argparse.FileType("r")) parser.add_argument( "--training", default=False, action="store_true", help="Set defaults that make sense for training data.", ) parser.add_argument( "--legacy", default=False, action="store_true", help="Use legacy APIs even for newer Galaxies that should have a batch upload API enabled.", ) return parser
[docs] def main(argv=None): args = _parser().parse_args(argv) if args.user and args.password: gi = galaxy.GalaxyInstance(url=args.galaxy, email=args.user, password=args.password) elif args.api_key: gi = galaxy.GalaxyInstance(url=args.galaxy, key=args.api_key) else: sys.exit("Please specify either a valid Galaxy username/password or an API key.") if args.verbose: log.basicConfig(level=log.DEBUG) setup_data_libraries(gi, args.infile, training=args.training, legacy=args.legacy)
if __name__ == "__main__": main()