-
-
Notifications
You must be signed in to change notification settings - Fork 329
Reworked codec pipelines #1670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reworked codec pipelines #1670
Conversation
5d4d09a
to
450bcc6
Compare
@normanrz Overall, this looks quite good to me. A couple of questions I had:
|
I refactored the codec pipeline quite a bit in the last commit.
Currently, the choice of codec pipeline is hard-coded. I am still looking for a way to specify that. Should that go into |
6c8c706
to
019ecc8
Compare
I think this PR is ready for a final review. I updated the PR description with the major changes. The only thing missing from my pov is the user-configurable batch size and codecpipeline selection. I'll add that after #1855 lands. |
src/zarr/codecs/pipeline/batched.py
Outdated
|
||
|
||
@dataclass(frozen=True) | ||
class BatchedCodecPipeline(CodecPipeline): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since CodecPipeline
defines read_batch
and write_batch
, what is the relationship between those methods and the new functionality offered by this class? I think a clarifying docstring for the class might be useful here, because I don't find the inheritance relationship intuitive (and ditto for the HybridPipeline
-- I have no intuition for that that one is for :) )
@@ -25,7 +25,7 @@ class Endian(Enum): | |||
|
|||
|
|||
@dataclass(frozen=True) | |||
class BytesCodec(ArrayBytesCodec): | |||
class BytesCodec(ArrayBytesCodecBatchMixin): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will ArrayBytesCodec
be used anywhere other than as the base class fro ArrayBytesCodecBatchMixin
? If not, we might want to consider simplifying the inheritance structure a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would anticipate that folks want to build a codec that implements their own batching, e.g. in rust or on the GPU. That is why, we should keep both classes.
I think it would be helpful with some docstrings. At least, I find it hard to follow the intention without any help :) |
I added a few doc strings and implemented the abstract Codec classes with Generics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@normanrz -- this is an impressive piece of work. I should admit that the size of it made it hard to review so I just have a few comments. Overall, I think its a big step forward and I want to get my hands on it so I favor moving it into the v3 branch asap.
This batched codec pipeline divides the chunk batches into batches of a configurable | ||
batch size ("mini-batch"). Fetching, decoding, encoding and storing are performed in | ||
lock step for each mini-batch. Multiple mini-batches are processing concurrently. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can come later but we're going to want some additional documentation on the behavior here. Reading this, I'm not entirely sure I get it.
Returns | ||
------- | ||
ArraySpec | ||
""" | ||
return chunk_spec | ||
|
||
def evolve(self, array_spec: ArraySpec) -> Self: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't need to be addressed in this PR, but the docstring for this method doesn't describe the behavior I would expect from a method called "evolve" -- my intuition is that object.evolve(property=new_val)
would return a copy of object
, with property
set to new_val
, which I think is consistent with how it works in attrs
. But codec.evolve
here is rather different . Based on this docstring, I would think this method should be called "from_array_spec" or something, to make it clear that we are getting a new codec instance from an array spec (and it would make sense to use .evolve
in this method of course).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about naming the function evolve_from_array_spec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alternatively, we could make from_array_spec
a class method that takes keyword arguments to cover the attributes that the input array spec doesn't convey
CodecOutput = TypeVar("CodecOutput", bound=np.ndarray | BytesLike) | ||
|
||
|
||
async def batching_helper( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a very naive question, but given that this function exists, why do we need to implement batching by writing new methods for codecs classes? Unless I'm missing something, the new batching methods just wrap batching_helper
around the base encode / decode functionality defined per-codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to put the point differently, why can't the codec pipeline class implement batching by calling batching_helper
on the encode
/ decode
methods of the codecs it contains?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be addressed here, so feel free to ignore for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you feel about this?
class _Codec(Generic[CodecInput, CodecOutput], Metadata):
...
async def _decode_single(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput:
raise NotImplementedError
async def decode(
self, chunk_data_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]]
) -> Iterable[CodecInput | None]:
return await batching_helper(self.decode_single, chunk_data_and_specs)
# same for encode
...
Batch-aware codecs would then only override the decode
method and ignore the _decode_single
method. That would be fine because _decode_single
is a protected method not intended to be part of the public interface. Single-chunk codecs could override _decode_single
and won't have to care about the batching. We could drop the batch mixins, then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good!
Co-authored-by: Joe Hamman <joe@earthmover.io>
…arr-python into batched-codec-pipeline
* adds wrapper codecs for the v2 codec pipeline * encode_chunk_key * refactor ArrayV2 away * empty zattrs * Apply suggestions from code review Co-authored-by: Davis Bennett <davis.v.bennett@gmail.com> * unify ArrayMetadata * abstract ArrayMetadata * unified Array.create * use zarr.config for batch_size * __init__.py aktualisieren Co-authored-by: Joe Hamman <joe@earthmover.io> * ruff --------- Co-authored-by: Davis Bennett <davis.v.bennett@gmail.com> Co-authored-by: Joe Hamman <joe@earthmover.io>
…arr-python into batched-codec-pipeline
@dataclass(frozen=True) | ||
class V2Compressor(ArrayBytesCodecBatchMixin): | ||
compressor: dict[str, JSON] | None | ||
|
||
is_fixed_size = False | ||
|
||
async def decode_single( | ||
self, | ||
chunk_bytes: Buffer, | ||
chunk_spec: ArraySpec, | ||
) -> NDBuffer: | ||
if chunk_bytes is None: | ||
return None | ||
|
||
if self.compressor is not None: | ||
compressor = numcodecs.get_codec(self.compressor) | ||
chunk_numpy_array = ensure_ndarray( | ||
await to_thread(compressor.decode, chunk_bytes.as_array_like()) | ||
) | ||
else: | ||
chunk_numpy_array = ensure_ndarray(chunk_bytes.as_array_like()) | ||
|
||
# ensure correct dtype | ||
if str(chunk_numpy_array.dtype) != chunk_spec.dtype: | ||
chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype) | ||
|
||
return NDBuffer.from_numpy_array(chunk_numpy_array) | ||
|
||
async def encode_single( | ||
self, | ||
chunk_array: NDBuffer, | ||
_chunk_spec: ArraySpec, | ||
) -> Buffer | None: | ||
chunk_numpy_array = chunk_array.as_numpy_array() | ||
if self.compressor is not None: | ||
compressor = numcodecs.get_codec(self.compressor) | ||
if ( | ||
not chunk_numpy_array.flags.c_contiguous | ||
and not chunk_numpy_array.flags.f_contiguous | ||
): | ||
chunk_numpy_array = chunk_numpy_array.copy(order="A") | ||
encoded_chunk_bytes = ensure_bytes( | ||
await to_thread(compressor.encode, chunk_numpy_array) | ||
) | ||
else: | ||
encoded_chunk_bytes = ensure_bytes(chunk_numpy_array) | ||
|
||
return Buffer.from_bytes(encoded_chunk_bytes) | ||
|
||
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: | ||
raise NotImplementedError | ||
|
||
|
||
@dataclass(frozen=True) | ||
class V2Filters(ArrayArrayCodecBatchMixin): | ||
filters: list[dict[str, JSON]] | ||
|
||
is_fixed_size = False | ||
|
||
async def decode_single( | ||
self, | ||
chunk_array: NDBuffer, | ||
chunk_spec: ArraySpec, | ||
) -> NDBuffer: | ||
chunk_numpy_array = chunk_array.as_numpy_array() | ||
# apply filters in reverse order | ||
if self.filters is not None: | ||
for filter_metadata in self.filters[::-1]: | ||
filter = numcodecs.get_codec(filter_metadata) | ||
chunk_numpy_array = await to_thread(filter.decode, chunk_numpy_array) | ||
|
||
# ensure correct chunk shape | ||
if chunk_numpy_array.shape != chunk_spec.shape: | ||
chunk_numpy_array = chunk_numpy_array.reshape( | ||
chunk_spec.shape, | ||
order=chunk_spec.order, | ||
) | ||
|
||
return NDBuffer.from_numpy_array(chunk_numpy_array) | ||
|
||
async def encode_single( | ||
self, | ||
chunk_array: NDBuffer, | ||
chunk_spec: ArraySpec, | ||
) -> NDBuffer | None: | ||
chunk_numpy_array = chunk_array.as_numpy_array().ravel(order=chunk_spec.order) | ||
|
||
for filter_metadata in self.filters: | ||
filter = numcodecs.get_codec(filter_metadata) | ||
chunk_numpy_array = await to_thread(filter.encode, chunk_numpy_array) | ||
|
||
return NDBuffer.from_numpy_array(chunk_numpy_array) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@madsbk I added a very naive Buffer integration in these codecs. Could you please help me get that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll merge this PR for now. We can work on a new PR to optimize this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
This PR refactors the codec pipelines in the v3 codebase. There are now a new default implementation:
BatchedCodecPipeline
, which divides the chunk batches into configurable "mini-batches". In a mini-batch all steps are run in lock step (e.g. fetching from store, decoding, encoding, writing to store). Multiple mini-batches are processed concurrently.This PR moves a lot of code from the Array to the codec pipeline, which is an opportunity to share more code between the
Array
and theShardingCodec
. To make that work newByteGetter
andByteSetter
protocols to generalize the existingStorePath
are introduced.It also changes the Codec API by making
decode
andencode
methods take chunk batches.TODO: