Skip to content
Snippets Groups Projects
Commit c9b53431 authored by abaumann's avatar abaumann
Browse files

Updates after PR review

parent 54d1bd88
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env bash
if [[ $1 =~ \.json$ ]]; then
echo "Authorizing service account using json file"
gcloud auth activate-service-account --key-file=$1
......
......@@ -2,7 +2,7 @@
from common import *
def get_pricing(ws_namespace, ws_name, query_sub_id, query_workflow_id, show_all_calls, dataset_project_name, dataset_name):
def print_submission_pricing(ws_namespace, ws_name, query_sub_id, query_workflow_id, show_all_calls, dataset_project_name, dataset_name):
print "Retrieving submissions in workspace..."
workspace_request = firecloud_api.get_workspace(ws_namespace, ws_name)
......@@ -29,66 +29,16 @@ def get_pricing(ws_namespace, ws_name, query_sub_id, query_workflow_id, show_all
workflow_dict[wf_id] = {"submission_id":sub_id, "workflow":wf}
get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict, query_workflow_id!=None and len(query_workflow_id) > 0, show_all_calls, dataset_project_name, dataset_name)
class CostRow():
def __init__(self, cost, product, resource_type, workflow_id, task_name, call_name):
self.cost = cost
self.product = product
self.resource_type = resource_type
self.workflow_id = workflow_id
self.task_name = task_name
self.call_name = call_name
last_token = None
token_request_count = 0
def get_token():
global token_request_count
global last_token
if token_request_count % 100 == 0:
command = "gcloud auth print-access-token"
last_token = subprocess.check_output(command, shell=True).decode().strip()
token_request_count += 1
return last_token
# this was added due to an issue with token expiring while running on a large submission.
# Not sure why the standard Google library was not handling that properly.
# TODO: generalize this or figure out why the Google library was not working as expected
def _fiss_access_headers_local(headers=None):
""" Return request headers for fiss.
Retrieves an access token with the user's google crededentials, and
inserts FISS as the User-Agent.
Args:
headers (dict): Include additional headers as key-value pairs
"""
credentials = GoogleCredentials.get_application_default()
access_token = get_token()
fiss_headers = {"Authorization" : "bearer " + access_token}
fiss_headers["User-Agent"] = firecloud_api.FISS_USER_AGENT
if headers:
fiss_headers.update(headers)
return fiss_headers
def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict, singleWorkflowMode, show_all_calls, dataset_project_name, dataset_name):
firecloud_api._fiss_access_headers = _fiss_access_headers_local
if len(workflow_dict) == 0:
fail("No submissions or workflows matching the criteria were found.")
# Imports the Google Cloud client library
from google.cloud import bigquery
subquery_template = "labels_value LIKE \"%%%s%%\""
subquery_list = [subquery_template % workflow_id for workflow_id in workflow_dict]
print "Gathering pricing data..."
client = bigquery.Client(dataset_project_name)
......@@ -107,38 +57,39 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
for table in dataset.list_tables():
query = """
SELECT GROUP_CONCAT(labels.key) WITHIN RECORD AS labels_key,
GROUP_CONCAT(labels.value) WITHIN RECORD labels_value,
cost,
product,
resource_type
FROM %s.%s
WHERE project.id = '%s'
AND
labels.key IN ("cromwell-workflow-id",
"cromwell-workflow-name",
"cromwell-sub-workflow-name",
"wdl-task-name",
"wdl-call-alias")
HAVING %s
# uncomment for quick testing:
#LIMIT 1
""" % (dataset.name, table.name, ws_namespace, workflows_subquery)
SELECT GROUP_CONCAT(labels.key) WITHIN RECORD AS labels_key,
GROUP_CONCAT(labels.value) WITHIN RECORD labels_value,
cost,
product,
resource_type
FROM %s.%s
WHERE project.id = '%s'
AND
labels.key IN ("cromwell-workflow-id",
"cromwell-workflow-name",
"cromwell-sub-workflow-name",
"wdl-task-name",
"wdl-call-alias")
HAVING %s
# uncomment for quick testing:
#LIMIT 1
""" % (dataset.name, table.name, ws_namespace, workflows_subquery)
print query
query_results = client.run_sync_query(query)
# Use standard SQL syntax for queries.
# See: https://cloud.google.com/bigquery/sql-reference/
#query_results.use_legacy_sql = False
# query_results.use_legacy_sql = False
query_results.run()
print "Retrieving BigQuery cost information for workflows %d to %d of %d..." % (query_index_start, min(query_index_end, len(workflow_dict)), len(workflow_dict))
print "Retrieving BigQuery cost information for workflows %d to %d of %d..." % (
query_index_start, min(query_index_end, len(workflow_dict)), len(workflow_dict))
page_token = None
while True:
rows, total_rows, page_token = query_results.fetch_data(
rows = query_results.fetch_data(
max_results=1000,
page_token=page_token)
for row in rows:
......@@ -156,7 +107,8 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
call_name = task_name
else:
call_name = labels["wdl-call-alias"]
workflow_id_to_cost[workflow_id].append(CostRow(cost, product, resource_type, workflow_id, task_name, call_name) )
workflow_id_to_cost[workflow_id].append(
CostRow(cost, product, resource_type, workflow_id, task_name, call_name))
if workflow_id in workflow_dict:
matched_workflow_ids.add(workflow_id)
......@@ -192,18 +144,17 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
workflow_ids_with_no_cost.add(wf_id)
continue
workflow_metadata_json = get_workflow_metadata(ws_namespace, ws_name, submission_id, wf_id)
if "calls" not in workflow_metadata_json:
#print ws_namespace, ws_name, submission_id, wf_id, workflow_metadata_json
# print ws_namespace, ws_name, submission_id, wf_id, workflow_metadata_json
continue
# print json.dumps(workflow_metadata_json, indent=4, sort_keys=True)
print workflow_metadata_json["calls"].keys()
#workflow_metadata_json = workflow_id_to_metadata_json[wf_id]
calls_lower_json = dict((k.split(".")[-1].lower(), v) for k, v in workflow_metadata_json["calls"].iteritems())
# workflow_metadata_json = workflow_id_to_metadata_json[wf_id]
calls_lower_json = dict(
(k.split(".")[-1].lower(), v) for k, v in workflow_metadata_json["calls"].iteritems())
if len(calls_lower_json) == 0:
print "\tNo Calls."
......@@ -238,7 +189,8 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
for call_name in call_name_to_cost:
print call_name, calls_lower_json.keys(), calls_lower_translated_json.keys()
calls = calls_lower_json[call_name] if call_name in calls_lower_json else calls_lower_translated_json[call_name]
calls = calls_lower_json[call_name] if call_name in calls_lower_json else calls_lower_translated_json[
call_name]
call_pricing = call_name_to_cost[call_name]
......@@ -260,10 +212,13 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
for call in sorted(calls, key=lambda call: call["attempt"]):
start = dateutil.parser.parse(call["start"])
end = dateutil.parser.parse(call["end"])
preempted = "[preempted]" if call["preemptible"] == True and call["executionStatus"] == "RetryableFailure" else ""
print "|\t|\t * Attempt #%d: start: %s - end: %s (elapsed: %s) %s" % (call["attempt"], start, end, end-start, preempted)
preempted = "[preempted]" if call["preemptible"] == True and call[
"executionStatus"] == "RetryableFailure" else ""
print "|\t|\t * Attempt #%d: start: %s - end: %s (elapsed: %s) %s" % (
call["attempt"], start, end, end - start, preempted)
sorted_costs = sorted(resource_type_to_pricing, key=lambda rt: resource_type_to_pricing[rt], reverse=True)
sorted_costs = sorted(resource_type_to_pricing, key=lambda rt: resource_type_to_pricing[rt],
reverse=True)
if len(sorted_costs) > 0:
for resource_type in sorted_costs:
......@@ -285,23 +240,75 @@ def get_workflow_pricing(ws_namespace, ws_name, workflow_dict, submission_dict,
workflow_ids_with_no_cost.add(wf_id)
print "|\t|\t\t\t(missing cost information)"
print "|\t| %s"%("-"*100)
print "|\t'--> Workflow Cost: $%f (cpu: $%f | disk: $%f | other: $%f)\n|" % (total, cpu_cost, pd_cost, other_cost)
print "|\t| %s" % ("-" * 100)
print "|\t'--> Workflow Cost: $%f (cpu: $%f | disk: $%f | other: $%f)\n|" % (
total, cpu_cost, pd_cost, other_cost)
submission_total += total
submission_pd_cost += pd_cost
submission_cpu_cost += cpu_cost
submission_other_cost += other_cost
if singleWorkflowMode:
# only a single workflow
if query_workflow_id!=None and len(query_workflow_id) > 0:
print "'%s" % ("-" * 100)
else:
print "| %s" % ("-" * 100)
missing_workflows = len(workflow_ids_with_no_cost) > 0
caveat_text = (" (** for %d out of %d workflows)")%(wf_count-len(workflow_ids_with_no_cost), wf_count) if missing_workflows else ""
print "'--> Submission Cost%s: $%f (cpu: $%f | disk: $%f | other: $%f)\n" % (caveat_text, submission_total, submission_cpu_cost, submission_pd_cost, submission_other_cost)
caveat_text = (" (** for %d out of %d workflows)") % (
wf_count - len(workflow_ids_with_no_cost), wf_count) if missing_workflows else ""
print "'--> Submission Cost%s: $%f (cpu: $%f | disk: $%f | other: $%f)\n" % (
caveat_text, submission_total, submission_cpu_cost, submission_pd_cost, submission_other_cost)
if missing_workflows:
print " ** %d workflows without cost information, e.g. %s" % (len(workflow_ids_with_no_cost), next(iter(workflow_ids_with_no_cost)))
print " ** %d workflows without cost information, e.g. %s" % (
len(workflow_ids_with_no_cost), next(iter(workflow_ids_with_no_cost)))
class CostRow():
def __init__(self, cost, product, resource_type, workflow_id, task_name, call_name):
self.cost = cost
self.product = product
self.resource_type = resource_type
self.workflow_id = workflow_id
self.task_name = task_name
self.call_name = call_name
last_token = None
token_request_count = 0
def get_token():
global token_request_count
global last_token
if token_request_count % 100 == 0:
command = "gcloud auth print-access-token"
last_token = subprocess.check_output(command, shell=True).decode().strip()
token_request_count += 1
return last_token
# this was added due to an issue with token expiring while running on a large submission.
# Not sure why the standard Google library was not handling that properly.
# TODO: generalize this or figure out why the Google library was not working as expected
def _fiss_access_headers_local(headers=None):
""" Return request headers for fiss.
Retrieves an access token with the user's google crededentials, and
inserts FISS as the User-Agent.
Args:
headers (dict): Include additional headers as key-value pairs
"""
credentials = GoogleCredentials.get_application_default()
access_token = get_token()
fiss_headers = {"Authorization" : "bearer " + access_token}
fiss_headers["User-Agent"] = firecloud_api.FISS_USER_AGENT
if headers:
fiss_headers.update(headers)
return fiss_headers
def main():
setup()
......@@ -329,7 +336,7 @@ def main():
if not args.dataset_name:
print "Note that this script expects the billing export table to be named 'billing_export'. "
get_pricing(args.ws_namespace, args.ws_name, args.submission_id, args.workflow_id, args.show_all_calls,
print_submission_pricing(args.ws_namespace, args.ws_name, args.submission_id, args.workflow_id, args.show_all_calls,
args.dataset_project if args.dataset_project else args.ws_namespace,
args.dataset_name if args.dataset_name else 'billing_export')
......
......@@ -129,7 +129,7 @@ def _fiss_access_headers_local(headers=None):
def setup():
firecloud_api._fiss_access_headers = _fiss_access_headers_local
registration_info = requests.get("https://api.firecloud.org/register", headers=firecloud_api._fiss_access_headers())
if registration_info.status_code == 404:
if registration_info.status_code != 200:
fail("This account is not registered with FireCloud.")
print "Using credentials for firecloud account:", registration_info.json()["userInfo"]["userEmail"]
......
......@@ -4,4 +4,4 @@ This script will take a submission id from a given workspace and bind the output
Optionally an expression override argument can be given that allows new output expressions to be defined and override the existing method config's output expressions. This can be used for example to bind an output that did not originally have an expresion defined for it when the analysis was run.
Run this as follows (from the main directory):
```./run.sh rebind_output_attributes/rebind_output_attributes.py -p <workspace project> -n <workspace name> -s <submission id of the submission you want the outputs from> -e <optional, if used this can override output expressions used in the method config for this submission. Syntax is in the form {"output_name": "expression"}```
\ No newline at end of file
```./run.sh rebind_output_attributes/rebind_output_attributes.py -p <workspace project> -n <workspace name> -s <submission id of the overriding outputs> -e <optional, if used this can override output expressions used in the method config for this submission. Syntax is in the form {"output_name": "expression"}```
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment