-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Enable tracking of vBucket stream consumption by providing a callback triggered on each DCP event #124
Comments
…ndyol#124 Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
hi @ziollek
|
…ndyol#124 Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
…ndyol#124 Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
Oh, I didn't know about these on-top projects :). Currently, I need to feed application memory from the DCP stream, but I will take a closer look in my spare time. |
i open pr with your fork #125 lets go with that 🚀 |
…ion #124 (#125) * Implement feature allowing tracking progress of events consuption #124 Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl> * fix typo, remove debug print --------- Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl> Co-authored-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
Why is this beneficial?
Consider an application that exposes an API and:
a. Maintains in-memory aggregations based on data stored in DCP.
b. Needs to determine when it has caught up with historical data before serving requests.
If we use DCP to feed the application with both historical and ongoing data, requirement (b) presents a challenge.
It is possible to fetch the current sequence numbers at startup using
dcp.GetClient().GetVBucketSeqNos()
and then start DCP. However, not all events will be passed to our listener due to some being skipped in stream.listen.Proposed solution
The solution involves replacing the listener callback in the
dcp
struct with an interface that defines two methods:ConsumeEvent(ctx *ListenerContext)
, which maintains compatibility with the existing Listener callback type.TrackOffset(vbID uint16, offset *Offset)
, which will be invoked within stream.setOffset.Backward Compatibility
To maintain backward compatibility, the existing factory method:
will be preserved.
Additionally, a new factory method will be introduced to support the extended interface:
Alternative Approaches
go-dcp
.The text was updated successfully, but these errors were encountered: