"""Module to handle Google BigQuery Service"""
import logging
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from gcloud_utils.base_client import BaseClient
from gcloud_utils.bigquery.query_builder import QueryBuilder
[docs]class Bigquery(BaseClient):
"""Google-Bigquery handler"""
FILE_FORMATS = {
"csv": "CSV",
"json": "NEWLINE_DELIMITED_JSON",
"avro": "AVRO",
"parquet": "PARQUET",
"orc": "ORC"
}
COMPRESSION_FORMATS = {
None: "NONE",
"gz": "GZIP",
"snappy": "SNAPPY"
}
_MODEL_CLIENT = bigquery
def __init__(self, client=None, log_level=logging.ERROR):
super(Bigquery, self).__init__(client, log_level)
self._query = None
[docs] def query(self, query_or_object, **kwargs):
"""Execute a query"""
if isinstance(query_or_object, QueryBuilder):
kwargs["query"] = query_or_object.query
else:
kwargs["query"] = query_or_object
self._query = kwargs["query"]
return self._client.query(**kwargs).result()
[docs] def query_to_table(self, query_or_object, dataset_id,
table_id, write_disposition="WRITE_TRUNCATE",
job_config=None, **kwargs):
"""Execute a query in a especific table"""
job_config = job_config if job_config else bigquery.QueryJobConfig()
table = self._client.dataset(dataset_id).table(table_id)
job_config.destination = table
job_config.write_disposition = write_disposition
return self.query(query_or_object, job_config=job_config, **kwargs)
def _complete_filename(self, filename, export_format, compression_format):
if (self.COMPRESSION_FORMATS.get(compression_format) and
self.FILE_FORMATS.get(export_format)):
complete_filename = "{}_*.{}".format(filename, export_format)
if compression_format:
complete_filename = "{}.{}".format(
complete_filename, compression_format)
return complete_filename
raise Exception(
"Only valid file formats: {}. Only valid compression formats: {}"
.format(
",".join(self.FILE_FORMATS.keys()),
",".join(self.COMPRESSION_FORMATS.keys())
)
)
[docs] def table_to_cloud_storage(self, dataset_id, table_id,
bucket_name, filename, job_config=None,
export_format="csv", compression_format="gz", location="US",
**kwargs):
"""Extract a table from BigQuery and send to GoogleStorage"""
complete_filename = self._complete_filename(
filename, export_format, compression_format)
destination_uri = "gs://{}/{}".format(bucket_name, complete_filename)
table = self._client.dataset(dataset_id).table(table_id)
job_config = job_config if job_config else bigquery.ExtractJobConfig()
job_config.compression = self.COMPRESSION_FORMATS.get(
compression_format)
job_config.destination_format = self.FILE_FORMATS.get(export_format)
return self._client.extract_table(
table,
destination_uri,
location=location,
job_config=job_config, **kwargs).result()
[docs] def create_dataset(self, dataset_id):
"""Create a dataset"""
dataset = bigquery.Dataset(self._client.dataset(dataset_id))
return self._client.create_dataset(dataset, True)
[docs] def create_table(self, dataset_id, table_id):
"""Create a table based on dataset"""
self.create_dataset(dataset_id)
dataset_ref = self._client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = bigquery.Table(table_ref)
return self._client.create_table(table, True)
[docs] def cloud_storage_to_table(self, bucket_name, filename,
dataset_id, table_id, job_config=None,
import_format="csv", location="US", **kwargs):
"""Extract table from GoogleStorage and send to BigQuery"""
self.create_table(dataset_id, table_id)
dataset_ref = self._client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = job_config if job_config else bigquery.LoadJobConfig()
job_config.source_format = self.FILE_FORMATS.get(import_format)
return self._client.load_table_from_uri(
"gs://{}/{}".format(bucket_name, filename),
table_ref,
job_config=job_config,
location=location,
**kwargs
).result()
[docs] def table_exists(self, table_id, dataset_id, project_id=None):
"""Check if tables exists"""
client = bigquery.Client(project_id) if project_id else self._client
dataset = client.dataset(dataset_id)
table = dataset.table(table_id)
try:
self._client.get_table(table)
return True
except NotFound:
return False