Skip to content
Snippets Groups Projects
Commit 740ea428 authored by abaumann's avatar abaumann Committed by GitHub
Browse files

Merge pull request #8 from broadinstitute/ab_fix_auth_etc

Added auth script to help switch app default credentials ...
parents 68dbefb6 c9b53431
No related branches found
No related tags found
No related merge requests found
auth.sh 0 → 100755
#!/usr/bin/env bash
if [[ $1 =~ \.json$ ]]; then
echo "Authorizing service account using json file"
gcloud auth activate-service-account --key-file=$1
export GOOGLE_APPLICATION_CREDENTIALS=$1
elif [[ $1 =~ "@" ]]; then
echo "Authorizing using email address"
gcloud auth login $1
export GOOGLE_APPLICATION_CREDENTIALS=~/.config/gcloud/legacy_credentials/$1/adc.json
else
echo "Run this script in the following manner:"
echo " To auth as a regular user: '. auth.sh <your email>'"
echo " To auth as a service account: '. auth.sh <path to service account json>'"
echo "NOTE: The dot before auth.sh important as this sets an environment variable"
fi
\ No newline at end of file
......@@ -2,7 +2,7 @@
from common import *
def get_pricing(ws_namespace, ws_name, query_sub_id = None, query_workflow_id = None, show_all_calls=False):
def print_submission_pricing(ws_namespace, ws_name, query_sub_id, query_workflow_id, show_all_calls, dataset_project_name, dataset_name):
print "Retrieving submissions in workspace..."
workspace_request = firecloud_api.get_workspace(ws_namespace, ws_name)
......@@ -29,70 +29,20 @@ def get_pricing(ws_namespace, ws_name, query_sub_id = None, query_workflow_id =
workflow_dict[wf_id] = {"submission_id":sub_id, "workflow":wf}
get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict, query_workflow_id!=None and len(query_workflow_id) > 0, show_all_calls)
class CostRow():
def __init__(self, cost, product, resource_type, workflow_id, task_name, call_name):
self.cost = cost
self.product = product
self.resource_type = resource_type
self.workflow_id = workflow_id
self.task_name = task_name
self.call_name = call_name
last_token = None
token_request_count = 0
def get_token():
global token_request_count
global last_token
if token_request_count % 100 == 0:
command = "gcloud auth print-access-token"
last_token = subprocess.check_output(command, shell=True).decode().strip()
token_request_count += 1
return last_token
# this was added due to an issue with token expiring while running on a large submission.
# Not sure why the standard Google library was not handling that properly.
# TODO: generalize this or figure out why the Google library was not working as expected
def _fiss_access_headers_local(headers=None):
""" Return request headers for fiss.
Retrieves an access token with the user's google crededentials, and
inserts FISS as the User-Agent.
Args:
headers (dict): Include additional headers as key-value pairs
"""
credentials = GoogleCredentials.get_application_default()
access_token = get_token()
fiss_headers = {"Authorization" : "bearer " + access_token}
fiss_headers["User-Agent"] = firecloud_api.FISS_USER_AGENT
if headers:
fiss_headers.update(headers)
return fiss_headers
def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict, singleWorkflowMode, show_all_calls):
firecloud_api._fiss_access_headers = _fiss_access_headers_local
if len(workflow_dict) == 0:
fail("No submissions or workflows matching the criteria were found.")
# Imports the Google Cloud client library
from google.cloud import bigquery
subquery_template = "labels_value LIKE \"%%%s%%\""
subquery_list = [subquery_template % workflow_id for workflow_id in workflow_dict]
print "Gathering pricing data..."
client = bigquery.Client(ws_namespace)
dataset = client.dataset('billing_export')
client = bigquery.Client(dataset_project_name)
dataset = client.dataset(dataset_name)
matched_workflow_ids = set()
......@@ -107,38 +57,39 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
for table in dataset.list_tables():
query = """
SELECT GROUP_CONCAT(labels.key) WITHIN RECORD AS labels_key,
GROUP_CONCAT(labels.value) WITHIN RECORD labels_value,
cost,
product,
resource_type
FROM %s.%s
WHERE project.id = '%s'
AND
labels.key IN ("cromwell-workflow-id",
"cromwell-workflow-name",
"cromwell-sub-workflow-name",
"wdl-task-name",
"wdl-call-alias")
HAVING %s
# uncomment for quick testing:
#LIMIT 1
""" % (dataset.name, table.name, ws_namespace, workflows_subquery)
SELECT GROUP_CONCAT(labels.key) WITHIN RECORD AS labels_key,
GROUP_CONCAT(labels.value) WITHIN RECORD labels_value,
cost,
product,
resource_type
FROM %s.%s
WHERE project.id = '%s'
AND
labels.key IN ("cromwell-workflow-id",
"cromwell-workflow-name",
"cromwell-sub-workflow-name",
"wdl-task-name",
"wdl-call-alias")
HAVING %s
# uncomment for quick testing:
#LIMIT 1
""" % (dataset.name, table.name, ws_namespace, workflows_subquery)
print query
query_results = client.run_sync_query(query)
# Use standard SQL syntax for queries.
# See: https://cloud.google.com/bigquery/sql-reference/
#query_results.use_legacy_sql = False
# query_results.use_legacy_sql = False
query_results.run()
print "Retrieving BigQuery cost information for workflows %d to %d of %d..." % (query_index_start, min(query_index_end, len(workflow_dict)), len(workflow_dict))
print "Retrieving BigQuery cost information for workflows %d to %d of %d..." % (
query_index_start, min(query_index_end, len(workflow_dict)), len(workflow_dict))
page_token = None
while True:
rows, total_rows, page_token = query_results.fetch_data(
rows = query_results.fetch_data(
max_results=1000,
page_token=page_token)
for row in rows:
......@@ -156,7 +107,8 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
call_name = task_name
else:
call_name = labels["wdl-call-alias"]
workflow_id_to_cost[workflow_id].append(CostRow(cost, product, resource_type, workflow_id, task_name, call_name) )
workflow_id_to_cost[workflow_id].append(
CostRow(cost, product, resource_type, workflow_id, task_name, call_name))
if workflow_id in workflow_dict:
matched_workflow_ids.add(workflow_id)
......@@ -192,18 +144,17 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
workflow_ids_with_no_cost.add(wf_id)
continue
workflow_metadata_json = get_workflow_metadata(ws_namespace, ws_name, submission_id, wf_id)
if "calls" not in workflow_metadata_json:
#print ws_namespace, ws_name, submission_id, wf_id, workflow_metadata_json
# print ws_namespace, ws_name, submission_id, wf_id, workflow_metadata_json
continue
# print json.dumps(workflow_metadata_json, indent=4, sort_keys=True)
print workflow_metadata_json["calls"].keys()
#workflow_metadata_json = workflow_id_to_metadata_json[wf_id]
calls_lower_json = dict((k.split(".")[-1].lower(), v) for k, v in workflow_metadata_json["calls"].iteritems())
# workflow_metadata_json = workflow_id_to_metadata_json[wf_id]
calls_lower_json = dict(
(k.split(".")[-1].lower(), v) for k, v in workflow_metadata_json["calls"].iteritems())
if len(calls_lower_json) == 0:
print "\tNo Calls."
......@@ -238,7 +189,8 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
for call_name in call_name_to_cost:
print call_name, calls_lower_json.keys(), calls_lower_translated_json.keys()
calls = calls_lower_json[call_name] if call_name in calls_lower_json else calls_lower_translated_json[call_name]
calls = calls_lower_json[call_name] if call_name in calls_lower_json else calls_lower_translated_json[
call_name]
call_pricing = call_name_to_cost[call_name]
......@@ -260,10 +212,13 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
for call in sorted(calls, key=lambda call: call["attempt"]):
start = dateutil.parser.parse(call["start"])
end = dateutil.parser.parse(call["end"])
preempted = "[preempted]" if call["preemptible"] == True and call["executionStatus"] == "RetryableFailure" else ""
print "|\t|\t * Attempt #%d: start: %s - end: %s (elapsed: %s) %s" % (call["attempt"], start, end, end-start, preempted)
preempted = "[preempted]" if call["preemptible"] == True and call[
"executionStatus"] == "RetryableFailure" else ""
print "|\t|\t * Attempt #%d: start: %s - end: %s (elapsed: %s) %s" % (
call["attempt"], start, end, end - start, preempted)
sorted_costs = sorted(resource_type_to_pricing, key=lambda rt: resource_type_to_pricing[rt], reverse=True)
sorted_costs = sorted(resource_type_to_pricing, key=lambda rt: resource_type_to_pricing[rt],
reverse=True)
if len(sorted_costs) > 0:
for resource_type in sorted_costs:
......@@ -285,23 +240,75 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
workflow_ids_with_no_cost.add(wf_id)
print "|\t|\t\t\t(missing cost information)"
print "|\t| %s"%("-"*100)
print "|\t'--> Workflow Cost: $%f (cpu: $%f | disk: $%f | other: $%f)\n|" % (total, cpu_cost, pd_cost, other_cost)
print "|\t| %s" % ("-" * 100)
print "|\t'--> Workflow Cost: $%f (cpu: $%f | disk: $%f | other: $%f)\n|" % (
total, cpu_cost, pd_cost, other_cost)
submission_total += total
submission_pd_cost += pd_cost
submission_cpu_cost += cpu_cost
submission_other_cost += other_cost
if singleWorkflowMode:
# only a single workflow
if query_workflow_id!=None and len(query_workflow_id) > 0:
print "'%s" % ("-" * 100)
else:
print "| %s" % ("-" * 100)
missing_workflows = len(workflow_ids_with_no_cost) > 0
caveat_text = (" (** for %d out of %d workflows)")%(wf_count-len(workflow_ids_with_no_cost), wf_count) if missing_workflows else ""
print "'--> Submission Cost%s: $%f (cpu: $%f | disk: $%f | other: $%f)\n" % (caveat_text, submission_total, submission_cpu_cost, submission_pd_cost, submission_other_cost)
caveat_text = (" (** for %d out of %d workflows)") % (
wf_count - len(workflow_ids_with_no_cost), wf_count) if missing_workflows else ""
print "'--> Submission Cost%s: $%f (cpu: $%f | disk: $%f | other: $%f)\n" % (
caveat_text, submission_total, submission_cpu_cost, submission_pd_cost, submission_other_cost)
if missing_workflows:
print " ** %d workflows without cost information, e.g. %s" % (len(workflow_ids_with_no_cost), next(iter(workflow_ids_with_no_cost)))
print " ** %d workflows without cost information, e.g. %s" % (
len(workflow_ids_with_no_cost), next(iter(workflow_ids_with_no_cost)))
class CostRow():
def __init__(self, cost, product, resource_type, workflow_id, task_name, call_name):
self.cost = cost
self.product = product
self.resource_type = resource_type
self.workflow_id = workflow_id
self.task_name = task_name
self.call_name = call_name
last_token = None
token_request_count = 0
def get_token():
global token_request_count
global last_token
if token_request_count % 100 == 0:
command = "gcloud auth print-access-token"
last_token = subprocess.check_output(command, shell=True).decode().strip()
token_request_count += 1
return last_token
# this was added due to an issue with token expiring while running on a large submission.
# Not sure why the standard Google library was not handling that properly.
# TODO: generalize this or figure out why the Google library was not working as expected
def _fiss_access_headers_local(headers=None):
""" Return request headers for fiss.
Retrieves an access token with the user's google crededentials, and
inserts FISS as the User-Agent.
Args:
headers (dict): Include additional headers as key-value pairs
"""
credentials = GoogleCredentials.get_application_default()
access_token = get_token()
fiss_headers = {"Authorization" : "bearer " + access_token}
fiss_headers["User-Agent"] = firecloud_api.FISS_USER_AGENT
if headers:
fiss_headers.update(headers)
return fiss_headers
def main():
setup()
......@@ -318,14 +325,20 @@ def main():
parser.add_argument('-c', '--calls', dest='show_all_calls', action='store_true', required=False, help='Expand information about each call.')
parser.add_argument('-dp', '--dataset_project', dest='dataset_project', action='store', required=False, help='Optional project where dataset is stored - defaults to the workspace project.')
parser.add_argument('-dn', '--dataset_name', dest='dataset_name', action='store', required=False, help='Optional dataset name where billing data is stored - defaults to billing_export.')
args = parser.parse_args()
if args.workflow_id and not args.submission_id:
fail("Submission ID must also be provided when querying for a Workflow ID")
print "Note that this script expects the billing export table to be named 'billing_export'. "
if not args.dataset_name:
print "Note that this script expects the billing export table to be named 'billing_export'. "
get_pricing(args.ws_namespace, args.ws_name, args.submission_id, args.workflow_id, args.show_all_calls)
print_submission_pricing(args.ws_namespace, args.ws_name, args.submission_id, args.workflow_id, args.show_all_calls,
args.dataset_project if args.dataset_project else args.ws_namespace,
args.dataset_name if args.dataset_name else 'billing_export')
if __name__ == "__main__":
......
......@@ -38,6 +38,8 @@ import dateutil.parser
# firecloud python bindings
from firecloud import api as firecloud_api
from firecloud.fccore import __fcconfig as fcconfig
processes = []
......@@ -89,8 +91,7 @@ class DefaultArgsParser:
args = self.parser.parse_args()
if args.fc_url:
firecloud_api.PROD_API_ROOT = args.fc_url
set_fc_url(args.fc_url)
return args
......@@ -104,18 +105,42 @@ def list_to_dict(input_list, key_fcn, value_fcn=lambda item: item, transform_fcn
return dicted_list
def setup():
def get_access_token():
return GoogleCredentials.get_application_default().get_access_token().access_token
# only needed until firecloud python library in pypi supports service accounts
def _fiss_access_headers_local(headers=None):
""" Return request headers for fiss.
Retrieves an access token with the user's google crededentials, and
inserts FISS as the User-Agent.
Args:
headers (dict): Include additional headers as key-value pairs
"""
credentials = GoogleCredentials.get_application_default()
print "Using Google client id:", credentials.client_id
credentials = credentials.create_scoped(['https://www.googleapis.com/auth/userinfo.profile', 'https://www.googleapis.com/auth/userinfo.email'])
access_token = credentials.get_access_token().access_token
fiss_headers = {"Authorization" : "bearer " + access_token}
fiss_headers["User-Agent"] = firecloud_api.FISS_USER_AGENT
if headers:
fiss_headers.update(headers)
return fiss_headers
def setup():
firecloud_api._fiss_access_headers = _fiss_access_headers_local
registration_info = requests.get("https://api.firecloud.org/register", headers=firecloud_api._fiss_access_headers())
if registration_info.status_code != 200:
fail("This account is not registered with FireCloud.")
print "Using credentials for firecloud account:", registration_info.json()["userInfo"]["userEmail"]
def get_workflow_metadata(namespace, name, submission_id, workflow_id, *include_keys):
headers = firecloud_api._fiss_access_headers()
include_key_string = "includeKey=%s&" % ("%2C%20".join(list(include_keys))) if include_keys else ""
uri = "{0}/workspaces/{1}/{2}/submissions/{3}/workflows/{4}?&{5}expandSubWorkflows=false".format(
firecloud_api.PROD_API_ROOT, namespace, name, submission_id, workflow_id, include_key_string)
uri = "{0}workspaces/{1}/{2}/submissions/{3}/workflows/{4}?&{5}expandSubWorkflows=false".format(
get_fc_url(), namespace, name, submission_id, workflow_id, include_key_string)
return requests.get(uri, headers=headers).json()
......@@ -142,7 +167,7 @@ def get_entity_by_page(namespace, name, entity_type, page, page_size):
headers = firecloud_api._fiss_access_headers()
uri = "{0}/workspaces/{1}/{2}/entityQuery/{3}?page={4}&pageSize={5}".format(
firecloud_api.PROD_API_ROOT, namespace, name, entity_type, page, page_size)
get_fc_url(), namespace, name, entity_type, page, page_size)
return requests.get(uri, headers=headers).json()
......@@ -171,11 +196,19 @@ def get_all_bound_file_paths(ws_namespace, ws_name):
return attribute_name_for_url_to_entity_json
def set_fc_url(url):
fcconfig.root_url = url
def get_fc_url():
return fcconfig.root_url
def get_entity_by_page(namespace, name, entity_type, page, page_size):
headers = firecloud_api._fiss_access_headers()
uri = "{0}/workspaces/{1}/{2}/entityQuery/{3}?page={4}&pageSize={5}".format(
firecloud_api.PROD_API_ROOT, namespace, name, entity_type, page, page_size)
uri = "{0}workspaces/{1}/{2}/entityQuery/{3}?page={4}&pageSize={5}".format(
get_fc_url(), namespace, name, entity_type, page, page_size)
return requests.get(uri, headers=headers).json()
......@@ -184,7 +217,7 @@ def get_workspace_storage_estimate(namespace, name):
headers = firecloud_api._fiss_access_headers()
uri = "{0}/workspaces/{1}/{2}/storageCostEstimate".format(
firecloud_api.PROD_API_ROOT, namespace, name)
get_fc_url(), namespace, name)
return requests.get(uri, headers=headers)
......
## Rebind outputs from a given submission
This script will take a submission id from a given workspace and bind the outputs produced from this submission to the data model using the method config's output expressions.
Optionally an expression override argument can be given that allows new output expressions to be defined and override the existing method config's output expressions. This can be used for example to bind an output that did not originally have an expresion defined for it when the analysis was run.
Run this as follows (from the main directory):
```./run.sh rebind_output_attributes/rebind_output_attributes.py -p <workspace project> -n <workspace name> -s <submission id of the overriding outputs> -e <optional, if used this can override output expressions used in the method config for this submission. Syntax is in the form {"output_name": "expression"}```
\ No newline at end of file
#!/usr/bin/env python
from common import *
def fix_outputs(ws_namespace, ws_name, submission_id, expressions_override):
# get the workspace
workspace_request = firecloud_api.get_workspace(ws_namespace, ws_name)
if workspace_request.status_code != 200:
fail("Unable to find workspace: %s/%s at %s --- %s" % (
ws_namespace, ws_name, firecloud_api.PROD_API_ROOT, workspace_request.text))
# get the submission
submission_request = firecloud_api.get_submission(ws_namespace, ws_name, submission_id)
if submission_request.status_code != 200:
fail("Unable to find submission: %s" % submission_id)
submission_json = submission_request.json()
# translate the submission entity into workflow entity type
submission_entity_json = submission_json["submissionEntity"]
submission_entity_type = submission_entity_json["entityType"]
# either the submission entity type will be the same as the workflow entity type
# (when it's a submission on a single entity) or it will be a set of those entities
# - if it's a set, just strip _set from the end to get the type the set contains
workflow_entity_type = submission_entity_type.replace("_set", "")
# create an update TSV with the attributes to bind back to the data model from this
# given submission's outputs
tsv_filename = 'entity_update.tsv'
with open(tsv_filename, 'wb') as entity_update_tsv:
entity_writer = csv.writer(entity_update_tsv, delimiter='\t')
# id column name is entityType_id
entity_id_column = "%s_id" % workflow_entity_type
# initial headers need to be update:entityType_id
tsv_headers = ['update:%s' % entity_id_column]
# get the method config used by this submission
mc_namespace = submission_json["methodConfigurationNamespace"]
mc_name = submission_json["methodConfigurationName"]
method_config_json = firecloud_api.get_workspace_config(ws_namespace, ws_name, mc_namespace, mc_name).json()
# go through each output and create a header for the attribute that is needed
mc_outputs = method_config_json["outputs"]
mc_output_keys = mc_outputs.keys()
for mc_output_key in mc_output_keys:
# if the user provided expression to override for this output then use that instead
if expressions_override and mc_output_key in expressions_override:
output_expression = expressions_override[mc_output_key]
# otherwise get the output expression from the method config
else:
output_expression = mc_outputs[mc_output_key]
output_attribute = output_expression.replace("this.", "")
tsv_headers.append(output_attribute)
entity_writer.writerow(tsv_headers)
# go through each workflow in this submission
submission_workflows_json = submission_json["workflows"]
succeeded_workflows = [w for w in submission_workflows_json if w["status"] == "Succeeded"]
num_workflows = len(succeeded_workflows)
for workflow_idx, workflow_json in enumerate(succeeded_workflows):
workflow_id = workflow_json["workflowId"]
print "Processing workflow %d of %d: %s" % (workflow_idx + 1, num_workflows, workflow_id)
entity_attribute_updates = []
workflow_entity_name = workflow_json["workflowEntity"]["entityName"]
# the first column needs to be the name of the entity
entity_attribute_updates.append(workflow_entity_name)
# get workflow metadata and outputs that were produced
workflow_metadata_json = get_workflow_metadata(ws_namespace, ws_name, submission_id, workflow_id)
workflow_outputs_json = workflow_metadata_json["outputs"]
# go through each method config output in the same order as the headers
for mc_output_key in mc_output_keys:
workflow_output = workflow_outputs_json[mc_output_key]
# add the value from this workflow output to the same column as the attribute to bind it to
entity_attribute_updates.append(workflow_output)
# write the row values for this entity
entity_writer.writerow(entity_attribute_updates)
upload = prompt_to_continue("Update TSV file has been produced. Would you like to upload this file?")
if upload:
print "Uploading updated entities TSV..."
upload_request = firecloud_api.upload_entities_tsv(ws_namespace, ws_name, tsv_filename)
if upload_request.status_code != 200:
print "Error uploading updated entities TSV:", upload_request.text
print "Upload complete."
os.remove(tsv_filename)
else:
print "The file can be reviewed and manually uploaded - see %s" % tsv_filename
def main():
setup()
# The main argument parser
parser = DefaultArgsParser(description=
"Use an existing submission from a workspace to bind attributes back to the data model. "
"This can be used to fix issues with binding that may have occurred, or to revert outputs "
"to a previous submission. Additionally, an optional expression override can be used to "
"provide a new output expreession for a given output (e.g. to bind an output attribute that "
"was not originally bound.")
# Core application arguments
parser.add_argument('-p', '--workspace_namespace', dest='ws_namespace', action='store', required=True,
help='Workspace namespace')
parser.add_argument('-n', '--workspace_name', dest='ws_name', action='store', required=True, help='Workspace name')
parser.add_argument('-s', '--submission_id', dest='submission_id', action='store', required=True,
help='Submission Id')
parser.add_argument('-e', '--expressions_override', dest='expressions_override', action='store', required=False,
help='Optional argument to override output expressions used in the method config for this submission. Syntax is in the form \'{"output_name": "expression"}\'')
# Call the appropriate function for the given subcommand, passing in the parsed program arguments
args = parser.parse_args()
print "Note -- this script has the following limitations:"
print " * The output expressions must all be of the form 'this.attribute_name' - this does not handle " \
" cases such as 'this.case_sample.attribute_name'"
print " * The root entity type chosen has to be either a single entity of root entity type or a set of " \
" those entities. This will not work for instance if your method runs on a sample and you " \
" chose a pair with the launch expression 'this.case_sample'."
print " * The method config used for this submission can not have been deleted."
continue_script = prompt_to_continue("continue?")
expression_override = json.loads(args.expressions_override) if args.expressions_override else None
if continue_script:
fix_outputs(args.ws_namespace, args.ws_name, args.submission_id, expression_override)
else:
print "Exiting."
if __name__ == "__main__":
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment