-
Notifications
You must be signed in to change notification settings - Fork 470
Add docs for the changefeed kafka header option #19588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for cockroachdb-interactivetutorials-docs canceled.
|
✅ Deploy Preview for cockroachdb-api-docs canceled.
|
✅ Netlify Preview
To edit notification comments on pull requests, go to your Netlify site configuration. |
04d8216
to
d3fe09b
Compare
cc @rohan-joshi |
{% include_cached new-in.html version="v25.2" %} Use the `headers_json_column_name` option to specify a [`JSONB`]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed will emit as a Kafka header for each row. You can send metadata, such as routing or tracing information, at the protocol level in the header separate from the message payload. This allows for Kafka brokers or routers to filter the metadata the header contains without deserializing the payload. | ||
|
||
{{site.data.alerts.callout_info}} | ||
The `headers_json_column_name` option is supported with changefeeds emitting [JSON](#json) or [Avro](#avro) messages to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's kafka only, and kafka only supports those two formats, but the formats have nothing to do with the headers option
|
||
The Kafka topic receives the message payload containing the row-level change: | ||
|
||
~~~json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also show the headers that would be received by kafka? rather than showing a query which would simulate it.
perhaps something like
| key | value | headers |
| A | {..} | x=y, z=q|
(5 rows) | ||
~~~ | ||
|
||
You may need to duplicate fields between the message envelope and headers to support efficient routing and filtering by intermediate systems, such as Kafka brokers, stream processors, or observability tools, but still maintain the full context of the change in the message for downstream applications. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also note use cases like distributed tracing
|
||
{% include_cached copy-clipboard.html %} | ||
~~~ sql | ||
INSERT INTO customer_updates ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make an example where the data isn't as duplicated?
The Kafka topic receives the message payload containing the row-level change: | ||
|
||
~~~json | ||
{"after": {"change_description": "Updated phone number", "change_version": "v2", "customer_id": "5896dc90-a972-43e8-b69b-8b5a52691ce2", "operation_type": "update", "source_system": "crm_mobile", "update_id": "39a7bb4c-ee3b-4897-88fd-cfed94558e72", "updated_at": "2025-05-06T14:57:42.378814Z"}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you call out that the headers col is omitted
@@ -122,6 +122,7 @@ Option | Value | Description | |||
<a name="format"></a>`format` | `json` / `avro` / `csv` / `parquet` | Format of the emitted message. <br><br>`avro`: For mappings of CockroachDB types to Avro types, [refer-to-the-table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro-limitations]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-schema-registry) is required with `format=avro`. <br><br>`csv`: You cannot combine `format=csv` with the [`diff`](#diff) or [`resolved`](#resolved) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export-data-with-changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`. <br><br>`parquet`: Cloud storage is the only supported sink. The [`topic_in_value`](#topic-in-value) option is not compatible with `parquet` format.<br><br>Default: `format=json`. | |||
<a name="full-table-name"></a>`full_table_name` | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases.<br><br>**Note:** This option cannot modify existing table names used as topics, subjects, etc., as part of an [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) statement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option. <br><br>Example: `CREATE CHANGEFEED FOR foo... WITH full_table_name` will create the topic name `defaultdb.public.foo` instead of `foo`. | |||
<a name="gc-protect-expires-after"></a>`gc_protect_expires_after` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Automatically expires protected timestamp records that are older than the defined duration. In the case where a changefeed job remains paused, `gc_protect_expires_after` will trigger the underlying protected timestamp record to expire and cancel the changefeed job to prevent accumulation of protected data.<br><br>Refer to [Protect-Changefeed-Data-from-Garbage-Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data. | |||
<a name="headers-json-column-name"></a><span class="version-tag">New in v25.2:</span>`headers_json_column_name` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specify a [`JSONB`]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed will emit as a Kafka header for each row. Supported for JSON and Avro message format emitting to Kafka sinks. For more details, refer to [Specify a column as a Kafka header]({% link {{ page.version.version }}/changefeed-messages.md %}#specify-a-column-as-a-kafka-header). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto re format
Fixes DOC-12596
This PR adds docs for the new Kafka header option:
CREATE CHANGEFEED
page, which links back to the new section example on the message page.Preview
https://deploy-preview-19588--cockroachdb-docs.netlify.app/docs/v25.2/changefeed-messages.html#specify-a-column-as-a-kafka-header