Source code for gcloud_utils.dataproc

"""
Module to handle with Dataproc cluster
"""

import time
import logging
import datetime
import os
import re
from googleapiclient import discovery

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')


[docs]class Dataproc(object): """Module to handle with Dataproc cluster""" def __init__(self, project, region, http=None): self.__project = project self.__region = region self.__logger = logging.getLogger(name=self.__class__.__name__) self.__client = discovery.build('dataproc', 'v1', http=http) self.__pattern = re.compile(r'[\W_]+') def __format_job_id(self, job_id): return self.__pattern.sub('_', job_id)
[docs] def list_clusters(self): """List all clusters""" request = self.__client.projects().regions().clusters() result = request.list(projectId=self.__project, region=self.__region).execute() cluster_list = [] if 'clusters' in result: cluster_list.append(result['clusters']) while 'nextPageToken' in result: token = result['nextPageToken'] result = request.list( projectId=self.__project, region=self.__region, pageToken=token).execute() if 'clusters' in result: cluster_list.append(result['clusters']) return cluster_list
def __wait_midle_state(self, cluster_name, final_state, sleep_time=5): midle_state = True while midle_state: result = self.__client.projects()\ .regions()\ .clusters()\ .get(clusterName=cluster_name, projectId=self.__project, region=self.__region)\ .execute() midle_state = not result['status']['state'] == final_state self.__logger.info( "Cluster_Name:%s Status:%s", cluster_name, result['status']['state']) time.sleep(sleep_time) return not midle_state
[docs] def create_cluster(self, name, workers, workers_names=None, image_version='1.2.54-deb8', disk_size_in_gb=10, metadata=None, initialization_actions=None): """Create a cluster""" if workers_names is None: workers_names = ["worker" + str(i) for i in range(1, workers+1)] # Create a cluster data_to_create = { "projectId": self.__project, "clusterName": name, "config": { "configBucket": "", "gceClusterConfig": { "subnetworkUri": "default", "zoneUri": "{}-b".format(self.__region) }, "masterConfig": { "numInstances": 1, "instanceNames": [ "cluster-yarn-recsys-m" ], "machineTypeUri": "n1-standard-4", "diskConfig": { "bootDiskSizeGb": disk_size_in_gb, "numLocalSsds": 0 } }, "workerConfig": { "numInstances": workers, "instanceNames": workers_names, "machineTypeUri": "n1-standard-4", "diskConfig": { "bootDiskSizeGb": disk_size_in_gb, "numLocalSsds": 0 } }, "softwareConfig": { "imageVersion": image_version }, } } if metadata: data_to_create['config']['gceClusterConfig'].update( {"metadata": metadata}) if initialization_actions: data_to_create['config'].update( {"initializationActions": initialization_actions}) result = self.__client.projects()\ .regions()\ .clusters()\ .create(body=data_to_create, projectId=self.__project, region=self.__region)\ .execute() self.__wait_midle_state(name, "RUNNING") return result
[docs] def delete_cluster(self, name): """Delete cluster by name""" result = self.__client.projects()\ .regions()\ .clusters()\ .delete(clusterName=name, projectId=self.__project, region=self.__region)\ .execute() self.__wait_midle_state(name, "DELETING") return result
def __submit_job(self, job_id, cluster_name, submit_dict): job_details = { "projectId": self.__project, "job": { "placement": { "clusterName": cluster_name }, "reference": { "jobId": job_id } } } job_details["job"].update(submit_dict) result = self.__client.projects().regions().jobs().submit( projectId=self.__project, region=self.__region, body=job_details).execute() self.__logger.info("Submitted job ID %s", job_id) self.__wait_job_finish(job_id) return result
[docs] def submit_pyspark_job(self, cluster_name, gs_bucket, list_args, main_pyspark_file, python_files, archive_uris=None, properties=None): """Submit the pyspark job to cluster, assuming py files at `python_files` list has already been uploaded to `gs_bucket """ gs_root = "gs://{}/".format(gs_bucket) datetime_now = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") main_python_file = os.path.join(gs_root, main_pyspark_file) job_id = "pyspark_{}_{}".format( os.path.basename(main_pyspark_file), datetime_now) job_id_formated = self.__format_job_id(job_id) gs_python_files = [os.path.join(gs_root, python_file) for python_file in python_files] submit_dict = { "pysparkJob": { "mainPythonFileUri": main_python_file, "args": list_args, "pythonFileUris": gs_python_files } } if archive_uris: submit_dict["pysparkJob"].update({"archiveUris": archive_uris}) if properties: submit_dict["pysparkJob"].update({"properties": properties}) return self.__submit_job(job_id_formated, cluster_name, submit_dict)
[docs] def submit_spark_job(self, cluster_name, gs_bucket, list_args, jar_paths, main_class, properties=None): """Submits the Spark job to the cluster, assuming jars at `jar_paths` list has already been uploaded to `gs_bucket`""" gs_root = "gs://{}/".format(gs_bucket) datetime_now = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") jar_files = [os.path.join(gs_root, x) for x in jar_paths] main_class_formatted = self.__format_job_id(main_class) job_id = "spark_{}_{}".format(main_class_formatted, datetime_now) submit_dict = { "sparkJob": { "args": list_args, "mainClass": main_class, "jarFileUris": jar_files } } if properties: submit_dict["sparkJob"]["properties"] = properties return self.__submit_job(job_id, cluster_name, submit_dict)
def __wait_job_finish(self, job_id, sleep_time=5): request = self.__client.projects().regions().jobs().get( projectId=self.__project, region=self.__region, jobId=job_id) result = request.execute() status = result['status']['state'] self.__logger.info("JOB %s - STATUS:%s", job_id, status) while status not in ('ERROR', 'DONE'): result = request.execute() status = result['status']['state'] if status == 'ERROR': raise Exception(result['status']['details']) if status == 'DONE': self.__logger.info("Job finished.") return result self.__logger.info("JOB %s - STATUS:%s", job_id, status) # if 'yarnApplications' in result: # progress = result['yarnApplications'][0]['progress'] # self.__logger.info("Progress: %s%%", progress*100) time.sleep(sleep_time)