Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable tracking of vBucket stream consumption by providing a callback triggered on each DCP event #124

Closed
ziollek opened this issue Feb 12, 2025 · 3 comments

Comments

@ziollek
Copy link
Contributor

ziollek commented Feb 12, 2025

Why is this beneficial?

Consider an application that exposes an API and:
a. Maintains in-memory aggregations based on data stored in DCP.
b. Needs to determine when it has caught up with historical data before serving requests.

If we use DCP to feed the application with both historical and ongoing data, requirement (b) presents a challenge.

It is possible to fetch the current sequence numbers at startup using dcp.GetClient().GetVBucketSeqNos() and then start DCP. However, not all events will be passed to our listener due to some being skipped in stream.listen.

Proposed solution

The solution involves replacing the listener callback in the dcp struct with an interface that defines two methods:

  • ConsumeEvent(ctx *ListenerContext), which maintains compatibility with the existing Listener callback type.
  • TrackOffset(vbID uint16, offset *Offset), which will be invoked within stream.setOffset.
type Consumer interface {
	ConsumeEvent(ctx *ListenerContext)
	TrackOffset(vbID uint16, offset *Offset)
}

Backward Compatibility

To maintain backward compatibility, the existing factory method:

func NewDcp(cfg any, listener models.Listener) (Dcp, error)

will be preserved.

Additionally, a new factory method will be introduced to support the extended interface:

func NewExtendedDcp(cfg any, consumer models.Consumer) (Dcp, error)

Alternative Approaches

  • Modifying event-passing logic: Adjusting how events are passed to the listener to provide more control over which events should be consumed and to determine offsets within the listener callback.
    • Downside: This approach significantly impacts the streaming behavior and poses challenges for backward compatibility.
  • Fetching data from Couchbase during the warm-up stage: Retrieving the necessary data using alternative mechanisms (e.g., SQL++) before starting the DCP stream.
    • Downside: This adds complexity to an otherwise straightforward approach that relies solely on go-dcp.
ziollek added a commit to ziollek/go-dcp that referenced this issue Feb 12, 2025
…ndyol#124

Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
@erayarslan
Copy link
Member

erayarslan commented Feb 12, 2025

hi @ziollek
thank you so much for your feature request and contributions! 🚀
i think its awesome because we also need this kind of flexibility too to understand for example when our reindex job catchup to current system.
i will review this asap.
btw did you consider to implement this feature on connectors?
below are our current connectors using go-dcp;

go-dcp-elasticsearch
go-dcp-kafka
go-dcp-couchbase
go-dcp-sql

ziollek added a commit to ziollek/go-dcp that referenced this issue Feb 13, 2025
…ndyol#124

Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
ziollek added a commit to ziollek/go-dcp that referenced this issue Feb 13, 2025
…ndyol#124

Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
@ziollek
Copy link
Contributor Author

ziollek commented Feb 13, 2025

Oh, I didn't know about these on-top projects :). Currently, I need to feed application memory from the DCP stream, but I will take a closer look in my spare time.
I've prepared changes in my fork it is quite a straightforward adjustment.

@erayarslan
Copy link
Member

i open pr with your fork #125 lets go with that 🚀

erayarslan added a commit that referenced this issue Feb 13, 2025
…ion #124 (#125)

* Implement feature allowing tracking progress of events consuption #124

Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>

* fix typo, remove debug print

---------

Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
Co-authored-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants