Skip to content
Snippets Groups Projects
Commit 66f47b40 authored by abaumann's avatar abaumann
Browse files

Updating to make use of cloudtools. Using the mains and inheriting arguments...

Updating to make use of cloudtools.  Using the mains and inheriting arguments is awkward - need to find a better approach to this.
parent 66c4a415
No related tags found
No related merge requests found
......@@ -2,6 +2,8 @@ FROM google/cloud-sdk:170.0.1-slim
COPY hail_submit.py /hail_submit.py
# TODO: switch to actual repo once PR is merged
RUN pip install git+https://github.com/abaumann/cloudtools.git@ab_client_lib
RUN pip install --upgrade google-api-python-client
RUN pip install google-cloud-storage google-auth-httplib2
......
......@@ -3,6 +3,15 @@ import googleapiclient.discovery
import google.auth
from google.cloud import storage
import uuid
from cloudtools import start, submit, stop
class ClusterConfig(object):
def __init__(self, name, balance=0.0):
"""Return a Customer object whose name is *name* and starting
balance is *balance*."""
self.name = name
self.balance = balance
# functions
def create_cluster(dataproc, project, region, cluster_name,
......@@ -78,16 +87,17 @@ def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
print("Cluster created.")
break
def submit_pyspark_job(dataproc, project, region,
cluster_name, bucket_name, hail_script_path, script_arguments):
def submit_pyspark_job(dataproc, spark_version, project, region,
cluster_name, bucket_name, hail_hash, hail_script_path, script_arguments):
storage_client = storage.Client()
bucket = storage_client.get_bucket("hail-common")
blob = bucket.blob("latest-hash.txt" )
# get the hash from this text file, removing any trailing newline
hail_hash = blob.download_as_string().rstrip()
if not hail_hash:
storage_client = storage.Client()
bucket = storage_client.get_bucket("hail-common")
blob = bucket.blob( "latest-hash.txt" )
# get the hash from this text file, removing any trailing newline
hail_hash = blob.download_as_string().rstrip()
hail_jar_file="hail-hail-is-master-all-spark2.0.2-{}.jar".format(hail_hash)
hail_jar_file="hail-hail-is-master-all-spark{}-{}.jar".format(spark_version, hail_hash)
hail_jar_path="gs://hail-common/{}".format(hail_jar_file)
# upload the hail script to this dataproc staging bucket
......@@ -182,68 +192,29 @@ def get_client():
return dataproc
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='')
# required dataproc arguments
parser.add_argument('--dataprocMasterMachType', required=True,
help='Machine type to use for the master machine, e.g. n1-standard-8.')
parser.add_argument('--dataprocMasterBootDiskSize', required=True,
help='Size of the boot disk to use for the master machine in GB, e.g. 100')
parser.add_argument('--dataprocNumWorkers', required=True,
help='Number of worker nodes, e.g. 2')
parser.add_argument('--dataprocWorkerMachType', required=True,
help='Machine type to use for the worker nodes, e.g. n1-standard-8.')
parser.add_argument('--dataprocWorkerBootDiskSize', required=True,
help='Size of the boot disk to use for the worker nodes in GB, e.g. 100')
parser.add_argument('--dataprocWorkerNumSSD', required=True,
help='Number of SSD disks for use in the worker nodes, e.g. 2')
parser.add_argument('--dataprocWorkerPreemptible', required=False, action='store_true', default=False,
help='Number of preemptible instances to use for worker nodes, e.g. 2')
# the python hail script to execute on the dataproc cluster
parser.add_argument('hailScript', nargs=1)
# optional dataproc arguments
parser.add_argument('--dataprocRegion', dest='dataprocRegion', required=False, default="us-central1",
help='Optional region for use in choosing a region to create the cluster. Defaults to us-central1.')
parser.add_argument('--project', required=False,
help='Project to create the Dataproc cluster within. Defaults to the current project in the gcloud config.')
# parse the args above as well user defined arguments (unknown below) that will get passed to the hail script.
# the user defined arguments are whatever arguments that are needed by the python hail script.
args, script_args = parser.parse_known_args()
main_parser = argparse.ArgumentParser(description='Submit and wait for a Hail job.')
start.init_parser(main_parser)
main_parser.add_argument('--files', required=False, type=str, help='Comma-separated list of files to add to the working directory of the Hail application.')
main_parser.add_argument('--args', type=str, help='Quoted string of arguments to pass to the Hail script being submitted.')
main_parser.add_argument('script', type=str)
args = main_parser.parse_args()
dataproc = get_client()
try:
# get the current project from gcloud config
project = args.project if args.project else google.auth.default()[1]
cluster_name = "firecloud-hail-{}".format(uuid.uuid4())
print "Creating cluster {} in project: {}".format(cluster_name, project)
cluster_info = create_cluster(dataproc, project, args.dataprocRegion, cluster_name,
args.dataprocMasterMachType, args.dataprocMasterBootDiskSize, args.dataprocNumWorkers,
args.dataprocWorkerMachType, args.dataprocWorkerBootDiskSize, args.dataprocWorkerNumSSD,
args.dataprocWorkerPreemptible)
cluster_uuid = cluster_info["metadata"]["clusterUuid"]
active_clusters = wait_for_cluster_creation(dataproc, project, args.dataprocRegion, cluster_name)
clusters = list_clusters(dataproc, project, args.dataprocRegion)
for cluster in clusters["clusters"]:
if cluster["clusterUuid"] == cluster_uuid:
cluster_staging_bucket = cluster["config"]["configBucket"]
job_id = submit_pyspark_job(dataproc, project, args.dataprocRegion,
cluster_name, cluster_staging_bucket, args.hailScript[0], script_args)
job_result = wait_for_job(dataproc, project, args.dataprocRegion, job_id)
# cloudtools start
start.main(args)
job_id = submit.main(args)
job_result = wait_for_job(dataproc, args.project, args.region, job_id)
# TODO: what do we need to do to handle successful jobs?
print job_result
break
# TODO: what do we need to do to handle successful jobs?
print job_result
except Exception as e:
print e
raise
finally:
delete_cluster(dataproc, project, args.dataprocRegion, cluster_name)
\ No newline at end of file
stop.main(args)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment