Skip to content

Commit 45dc983

Browse files
authored
feat: update GrpcBlobReadChannel to allow seek/limit after read (#1834)
### Refactoring Extract the majority of BlobReadChannelV2 to a new abstract base class BaseStorageReadChannel which effectively adapts a LazyReadChannel to a StorageReadChannel. BaseStorageReadChannel now defines an abstract method newLazyReadChannel which each implementation is responsible for implementing to integrate into the lifecycle.
1 parent b8f4316 commit 45dc983

File tree

9 files changed

+293
-313
lines changed

9 files changed

+293
-313
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import static com.google.cloud.storage.ByteSizeConstants._2MiB;
20+
import static java.util.Objects.requireNonNull;
21+
22+
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
import org.checkerframework.checker.nullness.qual.Nullable;
26+
27+
abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
28+
29+
private ByteRangeSpec byteRangeSpec;
30+
private int chunkSize = _2MiB;
31+
private BufferHandle bufferHandle;
32+
private LazyReadChannel<T> lazyReadChannel;
33+
34+
@Nullable private T resolvedObject;
35+
36+
protected BaseStorageReadChannel() {
37+
this.byteRangeSpec = ByteRangeSpec.nullRange();
38+
}
39+
40+
@Override
41+
public final synchronized void setChunkSize(int chunkSize) {
42+
StorageException.wrapIOException(() -> maybeResetChannel(true));
43+
this.chunkSize = chunkSize;
44+
}
45+
46+
@Override
47+
public final synchronized boolean isOpen() {
48+
if (lazyReadChannel == null) {
49+
return true;
50+
} else {
51+
LazyReadChannel<T> tmp = internalGetLazyChannel();
52+
return tmp.isOpen();
53+
}
54+
}
55+
56+
@Override
57+
public final synchronized void close() {
58+
if (internalGetLazyChannel().isOpen()) {
59+
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
60+
}
61+
}
62+
63+
@Override
64+
public final synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) {
65+
requireNonNull(byteRangeSpec, "byteRangeSpec must be non null");
66+
StorageException.wrapIOException(() -> maybeResetChannel(false));
67+
this.byteRangeSpec = byteRangeSpec;
68+
return this;
69+
}
70+
71+
@Override
72+
public final ByteRangeSpec getByteRangeSpec() {
73+
return byteRangeSpec;
74+
}
75+
76+
@Override
77+
public final synchronized int read(ByteBuffer dst) throws IOException {
78+
long diff = byteRangeSpec.length();
79+
if (diff <= 0) {
80+
close();
81+
return -1;
82+
}
83+
try {
84+
int read = internalGetLazyChannel().getChannel().read(dst);
85+
if (read != -1) {
86+
byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read);
87+
} else {
88+
close();
89+
}
90+
return read;
91+
} catch (StorageException e) {
92+
if (e.getCode() == 416) {
93+
// HttpStorageRpc turns 416 into a null etag with an empty byte array, leading
94+
// BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open.
95+
// Emulate that same behavior here to preserve behavior compatibility, though this should
96+
// be removed in the next major version.
97+
return -1;
98+
} else {
99+
throw new IOException(e);
100+
}
101+
} catch (IOException e) {
102+
throw e;
103+
} catch (Exception e) {
104+
throw new IOException(StorageException.coalesce(e));
105+
}
106+
}
107+
108+
protected final BufferHandle getBufferHandle() {
109+
if (bufferHandle == null) {
110+
bufferHandle = BufferHandle.allocate(chunkSize);
111+
}
112+
return bufferHandle;
113+
}
114+
115+
protected final int getChunkSize() {
116+
return chunkSize;
117+
}
118+
119+
@Nullable
120+
protected T getResolvedObject() {
121+
return resolvedObject;
122+
}
123+
124+
protected void setResolvedObject(@Nullable T resolvedObject) {
125+
this.resolvedObject = resolvedObject;
126+
}
127+
128+
protected abstract LazyReadChannel<T> newLazyReadChannel();
129+
130+
private void maybeResetChannel(boolean freeBuffer) throws IOException {
131+
if (lazyReadChannel != null && lazyReadChannel.isOpen()) {
132+
try (BufferedReadableByteChannel ignore = lazyReadChannel.getChannel()) {
133+
if (bufferHandle != null && !freeBuffer) {
134+
bufferHandle.get().clear();
135+
} else if (freeBuffer) {
136+
bufferHandle = null;
137+
}
138+
lazyReadChannel = null;
139+
}
140+
}
141+
}
142+
143+
private LazyReadChannel<T> internalGetLazyChannel() {
144+
if (lazyReadChannel == null) {
145+
lazyReadChannel = newLazyReadChannel();
146+
}
147+
return lazyReadChannel;
148+
}
149+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java

+13-123
Original file line numberDiff line numberDiff line change
@@ -16,164 +16,54 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import static com.google.cloud.storage.ByteSizeConstants._2MiB;
20-
import static java.util.Objects.requireNonNull;
21-
2219
import com.google.api.services.storage.Storage;
2320
import com.google.api.services.storage.model.StorageObject;
2421
import com.google.cloud.ReadChannel;
2522
import com.google.cloud.RestorableState;
2623
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
27-
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
2824
import com.google.cloud.storage.spi.v1.StorageRpc;
2925
import com.google.common.base.MoreObjects;
30-
import java.io.IOException;
3126
import java.io.Serializable;
32-
import java.nio.ByteBuffer;
3327
import java.util.Map;
3428
import java.util.Objects;
3529

36-
final class BlobReadChannelV2 implements StorageReadChannel {
30+
final class BlobReadChannelV2 extends BaseStorageReadChannel<StorageObject> {
3731

3832
private final StorageObject storageObject;
3933
private final Map<StorageRpc.Option, ?> opts;
4034
private final BlobReadChannelContext blobReadChannelContext;
4135

42-
private LazyReadChannel<StorageObject> lazyReadChannel;
43-
private StorageObject resolvedObject;
44-
private ByteRangeSpec byteRangeSpec;
45-
46-
private int chunkSize = _2MiB;
47-
private BufferHandle bufferHandle;
48-
4936
BlobReadChannelV2(
5037
StorageObject storageObject,
5138
Map<StorageRpc.Option, ?> opts,
5239
BlobReadChannelContext blobReadChannelContext) {
5340
this.storageObject = storageObject;
5441
this.opts = opts;
5542
this.blobReadChannelContext = blobReadChannelContext;
56-
this.byteRangeSpec = ByteRangeSpec.nullRange();
57-
}
58-
59-
@Override
60-
public synchronized void setChunkSize(int chunkSize) {
61-
StorageException.wrapIOException(() -> maybeResetChannel(true));
62-
this.chunkSize = chunkSize;
63-
}
64-
65-
@Override
66-
public synchronized boolean isOpen() {
67-
if (lazyReadChannel == null) {
68-
return true;
69-
} else {
70-
LazyReadChannel<StorageObject> tmp = internalGetLazyChannel();
71-
return tmp.isOpen();
72-
}
73-
}
74-
75-
@Override
76-
public synchronized void close() {
77-
if (internalGetLazyChannel().isOpen()) {
78-
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
79-
}
80-
}
81-
82-
@Override
83-
public synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) {
84-
requireNonNull(byteRangeSpec, "byteRangeSpec must be non null");
85-
StorageException.wrapIOException(() -> maybeResetChannel(false));
86-
this.byteRangeSpec = byteRangeSpec;
87-
return this;
88-
}
89-
90-
@Override
91-
public ByteRangeSpec getByteRangeSpec() {
92-
return byteRangeSpec;
93-
}
94-
95-
@Override
96-
public synchronized int read(ByteBuffer dst) throws IOException {
97-
long diff = byteRangeSpec.length();
98-
if (diff <= 0) {
99-
close();
100-
return -1;
101-
}
102-
try {
103-
int read = internalGetLazyChannel().getChannel().read(dst);
104-
if (read != -1) {
105-
byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read);
106-
} else {
107-
close();
108-
}
109-
return read;
110-
} catch (StorageException e) {
111-
if (e.getCode() == 416) {
112-
// HttpStorageRpc turns 416 into a null etag with an empty byte array, leading
113-
// BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open.
114-
// Emulate that same behavior here to preserve behavior compatibility, though this should
115-
// be removed in the next major version.
116-
return -1;
117-
} else {
118-
throw new IOException(e);
119-
}
120-
} catch (IOException e) {
121-
throw e;
122-
} catch (Exception e) {
123-
throw new IOException(StorageException.coalesce(e));
124-
}
12543
}
12644

12745
@Override
12846
public RestorableState<ReadChannel> capture() {
12947
ApiaryReadRequest apiaryReadRequest = getApiaryReadRequest();
13048
return new BlobReadChannelV2State(
131-
apiaryReadRequest, blobReadChannelContext.getStorageOptions(), chunkSize);
49+
apiaryReadRequest, blobReadChannelContext.getStorageOptions(), getChunkSize());
13250
}
13351

134-
private void maybeResetChannel(boolean umallocBuffer) throws IOException {
135-
if (lazyReadChannel != null && lazyReadChannel.isOpen()) {
136-
try (BufferedReadableByteChannel ignore = lazyReadChannel.getChannel()) {
137-
if (bufferHandle != null && !umallocBuffer) {
138-
bufferHandle.get().clear();
139-
} else if (umallocBuffer) {
140-
bufferHandle = null;
141-
}
142-
lazyReadChannel = null;
143-
}
144-
}
145-
}
146-
147-
private LazyReadChannel<StorageObject> internalGetLazyChannel() {
148-
if (lazyReadChannel == null) {
149-
lazyReadChannel = newLazyReadChannel();
150-
}
151-
return lazyReadChannel;
152-
}
153-
154-
private LazyReadChannel<StorageObject> newLazyReadChannel() {
52+
protected LazyReadChannel<StorageObject> newLazyReadChannel() {
15553
return new LazyReadChannel<>(
156-
() -> {
157-
if (bufferHandle == null) {
158-
bufferHandle = BufferHandle.allocate(chunkSize);
159-
}
160-
return ResumableMedia.http()
161-
.read()
162-
.byteChannel(blobReadChannelContext)
163-
.setCallback(this::setResolvedObject)
164-
.buffered(bufferHandle)
165-
.setApiaryReadRequest(getApiaryReadRequest())
166-
.build();
167-
});
168-
}
169-
170-
private void setResolvedObject(StorageObject resolvedObject) {
171-
this.resolvedObject = resolvedObject;
54+
() ->
55+
ResumableMedia.http()
56+
.read()
57+
.byteChannel(blobReadChannelContext)
58+
.setCallback(this::setResolvedObject)
59+
.buffered(getBufferHandle())
60+
.setApiaryReadRequest(getApiaryReadRequest())
61+
.build());
17262
}
17363

17464
private ApiaryReadRequest getApiaryReadRequest() {
175-
StorageObject object = resolvedObject != null ? resolvedObject : storageObject;
176-
return new ApiaryReadRequest(object, opts, byteRangeSpec);
65+
StorageObject object = getResolvedObject() != null ? getResolvedObject() : storageObject;
66+
return new ApiaryReadRequest(object, opts, getByteRangeSpec());
17767
}
17868

17969
static class BlobReadChannelV2State implements RestorableState<ReadChannel>, Serializable {

0 commit comments

Comments
 (0)