-
-
Notifications
You must be signed in to change notification settings - Fork 330
WIP: google cloud storage class #252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1886,3 +1886,181 @@ def __delitem__(self, key): | |
with self._mutex: | ||
self._invalidate_keys() | ||
self._invalidate_value(key) | ||
|
||
|
||
# utility functions for object stores | ||
|
||
|
||
def _strip_prefix_from_path(path, prefix): | ||
# normalized things will not have any leading or trailing slashes | ||
path_norm = normalize_storage_path(path) | ||
prefix_norm = normalize_storage_path(prefix) | ||
if path_norm.startswith(prefix_norm): | ||
return path_norm[(len(prefix_norm)+1):] | ||
else: | ||
return path | ||
|
||
|
||
def _append_path_to_prefix(path, prefix): | ||
return '/'.join([normalize_storage_path(prefix), | ||
normalize_storage_path(path)]) | ||
|
||
|
||
def atexit_rmgcspath(bucket, path): | ||
from google.cloud import storage | ||
client = storage.Client() | ||
bucket = client.get_bucket(bucket) | ||
bucket.delete_blobs(bucket.list_blobs(prefix=path)) | ||
|
||
|
||
class GCSStore(MutableMapping): | ||
"""Storage class using a Google Cloud Storage (GCS) | ||
|
||
Parameters | ||
---------- | ||
bucket_name : string | ||
The name of the GCS bucket | ||
prefix : string, optional | ||
The prefix within the bucket (i.e. subdirectory) | ||
client_kwargs : dict, optional | ||
Extra options passed to ``google.cloud.storage.Client`` when connecting | ||
to GCS | ||
|
||
Notes | ||
----- | ||
In order to use this store, you must install the Google Cloud Storage | ||
`Python Client Library <https://cloud.google.com/storage/docs/reference/libraries>`_. | ||
You must also provide valid application credentials, either by setting the | ||
``GOOGLE_APPLICATION_CREDENTIALS`` environment variable or via | ||
`default credentials <https://cloud.google.com/sdk/gcloud/reference/auth/application-default/login>`_. | ||
""" | ||
|
||
def __init__(self, bucket_name, prefix=None, client_kwargs={}): | ||
|
||
self.bucket_name = bucket_name | ||
self.prefix = normalize_storage_path(prefix) | ||
self.client_kwargs = client_kwargs | ||
self.initialize_bucket() | ||
|
||
def initialize_bucket(self): | ||
from google.cloud import storage | ||
# run `gcloud auth application-default login` from shell | ||
client = storage.Client(**self.client_kwargs) | ||
self.bucket = client.get_bucket(self.bucket_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that it's also possible to do: self.bucket = storage.Bucket(client, name=self.bucket_name) ...which involves no network communication. Not sure this is a good idea in general as may want to retrieve the bucket info, but just mentioning. |
||
# need to properly handle excpetions | ||
import google.api_core.exceptions as exceptions | ||
self.exceptions = exceptions | ||
|
||
# needed for pickling | ||
def __getstate__(self): | ||
state = self.__dict__.copy() | ||
del state['bucket'] | ||
del state['exceptions'] | ||
return state | ||
|
||
def __setstate__(self, state): | ||
self.__dict__.update(state) | ||
self.initialize_bucket() | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, *args): | ||
pass | ||
|
||
def full_path(self, path=None): | ||
return _append_path_to_prefix(path, self.prefix) | ||
|
||
def list_gcs_directory_blobs(self, path): | ||
"""Return list of all blobs *directly* under a gcs prefix.""" | ||
prefix = normalize_storage_path(path) + '/' | ||
return [blob.name for blob in | ||
self.bucket.list_blobs(prefix=prefix, delimiter='/')] | ||
|
||
# from https://github.com/GoogleCloudPlatform/google-cloud-python/issues/920 | ||
def list_gcs_subdirectories(self, path): | ||
"""Return set of all "subdirectories" from a gcs prefix.""" | ||
prefix = normalize_storage_path(path) + '/' | ||
iterator = self.bucket.list_blobs(prefix=prefix, delimiter='/') | ||
prefixes = set() | ||
for page in iterator.pages: | ||
prefixes.update(page.prefixes) | ||
# need to strip trailing slash to be consistent with os.listdir | ||
return [path[:-1] for path in prefixes] | ||
|
||
def list_gcs_directory(self, prefix, strip_prefix=True): | ||
"""Return a list of all blobs and subdirectories from a gcs prefix.""" | ||
items = set() | ||
items.update(self.list_gcs_directory_blobs(prefix)) | ||
items.update(self.list_gcs_subdirectories(prefix)) | ||
items = list(items) | ||
if strip_prefix: | ||
items = [_strip_prefix_from_path(path, prefix) for path in items] | ||
return items | ||
|
||
def listdir(self, path=None): | ||
dir_path = self.full_path(path) | ||
return sorted(self.list_gcs_directory(dir_path, strip_prefix=True)) | ||
|
||
def rmdir(self, path=None): | ||
# make sure it's a directory | ||
dir_path = normalize_storage_path(self.full_path(path)) + '/' | ||
self.bucket.delete_blobs(self.bucket.list_blobs(prefix=dir_path)) | ||
|
||
def getsize(self, path=None): | ||
# this function should *not* be recursive | ||
# a lot of slash trickery is required to make this work right | ||
full_path = self.full_path(path) | ||
blob = self.bucket.get_blob(full_path) | ||
if blob is not None: | ||
return blob.size | ||
else: | ||
dir_path = normalize_storage_path(full_path) + '/' | ||
blobs = self.bucket.list_blobs(prefix=dir_path, delimiter='/') | ||
size = 0 | ||
for blob in blobs: | ||
size += blob.size | ||
return size | ||
|
||
def clear(self): | ||
self.rmdir() | ||
|
||
def __getitem__(self, key): | ||
blob_name = self.full_path(key) | ||
blob = self.bucket.get_blob(blob_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An alternative here is to do: from google.cloud import storage
blob = storage.Blob(blob_name, self.bucket) ...which involves less network communication (profiling shows number of calls to If this change was made, some rethinking of error handling may be needed, as the point at which a non-existing blob was detected might change. |
||
if blob: | ||
return blob.download_as_string() | ||
else: | ||
raise KeyError('Blob %s not found' % blob_name) | ||
|
||
def __setitem__(self, key, value): | ||
blob_name = self.full_path(key) | ||
blob = self.bucket.blob(blob_name) | ||
blob.upload_from_string(value) | ||
|
||
def __delitem__(self, key): | ||
blob_name = self.full_path(key) | ||
try: | ||
self.bucket.delete_blob(blob_name) | ||
except self.exceptions.NotFound as er: | ||
raise KeyError(er.message) | ||
|
||
def __contains__(self, key): | ||
blob_name = self.full_path(key) | ||
return self.bucket.get_blob(blob_name) is not None | ||
|
||
def __eq__(self, other): | ||
return ( | ||
isinstance(other, GCSStore) and | ||
self.bucket_name == other.bucket_name and | ||
self.prefix == other.prefix | ||
) | ||
|
||
def __iter__(self): | ||
blobs = self.bucket.list_blobs(prefix=self.prefix) | ||
for blob in blobs: | ||
yield _strip_prefix_from_path(blob.name, self.prefix) | ||
|
||
def __len__(self): | ||
iterator = self.bucket.list_blobs(prefix=self.prefix) | ||
return len(list(iterator)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth adding an option to use anonymous client. E.g., add an
anonymous=False
keyword argument, then make use ofstorage.Client.create_anonymous_client()
when it comes to creating the client if user has providedanonymous=True
.