Skip to content

Support for schema id in header #1978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion src/confluent_kafka/schema_registry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import io
from typing import Optional

from .schema_registry_client import (
Expand All @@ -34,8 +35,14 @@
SchemaReference,
ServerConfig
)
from ..serialization import SerializationError, MessageField

_KEY_SCHEMA_ID = "__key_schema_id"
_VALUE_SCHEMA_ID = "__value_schema_id"

_MAGIC_BYTE = 0
_MAGIC_BYTE_V0 = _MAGIC_BYTE
_MAGIC_BYTE_V1 = 1

__all__ = [
"ConfigCompatibilityLevel",
Expand All @@ -55,7 +62,11 @@
"ServerConfig",
"topic_subject_name_strategy",
"topic_record_subject_name_strategy",
"record_subject_name_strategy"
"record_subject_name_strategy",
"header_schema_id_serializer",
"prefix_schema_id_serializer",
"dual_schema_id_deserializer",
"prefix_schema_id_deserializer"
]


Expand Down Expand Up @@ -113,3 +124,93 @@

"""
return schema_ref.name if schema_ref is not None else None


def header_schema_id_serializer(payload: bytes, ctx, schema_id) -> bytes:
"""
Serializes the schema guid into the header.

Args:
payload (bytes): The payload to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_id (SchemaId): The schema ID to serialize.

Returns:
bytes: The payload
"""
headers = ctx.headers
if headers is None:
raise SerializationError("Missing headers")
header_key = _KEY_SCHEMA_ID if ctx.field == MessageField.KEY else _VALUE_SCHEMA_ID
header_value = schema_id.guid_to_bytes()
if isinstance(headers, list):
headers.append((header_key, header_value))
elif isinstance(headers, dict):
headers[header_key] = header_value
else:
raise SerializationError("Invalid headers type")
return payload


def prefix_schema_id_serializer(payload: bytes, ctx, schema_id) -> bytes:

Check warning on line 156 in src/confluent_kafka/schema_registry/__init__.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/__init__.py#L156

Remove the unused function parameter "ctx".
"""
Serializes the schema id into the payload prefix.

Args:
payload (bytes): The payload to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_id (SchemaId): The schema ID to serialize.

Returns:
bytes: The payload prefixed with the schema id
"""
return schema_id.id_to_bytes() + payload


def dual_schema_id_deserializer(payload: bytes, ctx, schema_id) -> io.BytesIO:
"""
Deserializes the schema id by first checking the header, then the payload prefix.

Args:
payload (bytes): The payload to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_id (SchemaId): The schema ID to serialize.

Returns:
bytes: The payload
"""
headers = ctx.headers
header_key = _KEY_SCHEMA_ID if ctx.field == MessageField.KEY else _VALUE_SCHEMA_ID
if headers is not None:
header_value = None
if isinstance(headers, list):
# look for header_key in headers
for header in headers:
if header[0] == header_key:
header_value = header[1]
break
elif isinstance(headers, dict):
header_value = headers.get(header_key, None)
if header_value is not None:
schema_id.from_bytes(io.BytesIO(header_value))
return io.BytesIO(payload)
return schema_id.from_bytes(io.BytesIO(payload))


def prefix_schema_id_deserializer(payload: bytes, ctx, schema_id) -> io.BytesIO:

Check warning on line 203 in src/confluent_kafka/schema_registry/__init__.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/__init__.py#L203

Remove the unused function parameter "ctx".
"""
Deserializes the schema id from the payload prefix.

Args:
payload (bytes): The payload to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_id (SchemaId): The schema ID to serialize.

Returns:
bytes: The payload
"""
return schema_id.from_bytes(io.BytesIO(payload))
149 changes: 80 additions & 69 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from copy import deepcopy
from io import BytesIO
from json import loads
from struct import pack, unpack
from typing import Dict, Union, Optional, Set, Callable

from fastavro import (schemaless_reader,
Expand All @@ -30,16 +29,18 @@
validate)
from fastavro.schema import load_schema

from . import (_MAGIC_BYTE,
Schema,
from . import (Schema,
topic_subject_name_strategy,
RuleMode,
RuleKind, SchemaRegistryClient)
from confluent_kafka.serialization import (SerializationError,
SerializationContext)
RuleKind, SchemaRegistryClient, prefix_schema_id_serializer,
dual_schema_id_deserializer)
from confluent_kafka.serialization import (SerializationContext)
from .rule_registry import RuleRegistry
from .serde import BaseSerializer, BaseDeserializer, RuleContext, FieldType, \
FieldTransform, RuleConditionError, ParsedSchemaCache
FieldTransform, RuleConditionError, ParsedSchemaCache, SchemaId


AVRO_TYPE = "AVRO"


AvroMessage = Union[
Expand Down Expand Up @@ -164,6 +165,12 @@
| | | |
| | | Defaults to topic_subject_name_strategy. |
+-----------------------------+----------+--------------------------------------------------+
| | | Callable(bytes, SerializationContext, schema_id) |
| | | -> bytes |
| | | |
| ``schema.id.serializer`` | callable | Defines how the schema id/guid is serialized. |
| | | Defaults to prefix_schema_id_serializer. |
+-----------------------------+----------+--------------------------------------------------+

Schemas are registered against subject names in Confluent Schema Registry that
define a scope in which the schemas can be evolved. By default, the subject name
Expand Down Expand Up @@ -223,9 +230,10 @@
'use.schema.id': None,
'use.latest.version': False,
'use.latest.with.metadata': None,
'subject.name.strategy': topic_subject_name_strategy}
'subject.name.strategy': topic_subject_name_strategy,

Check failure on line 233 in src/confluent_kafka/schema_registry/avro.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/avro.py#L233

Define a constant instead of duplicating this literal 'subject.name.strategy' 4 times.
'schema.id.serializer': prefix_schema_id_serializer}

def __init__(

Check failure on line 236 in src/confluent_kafka/schema_registry/avro.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/avro.py#L236

Refactor this function to reduce its Cognitive Complexity from 29 to the 15 allowed.
self,
schema_registry_client: SchemaRegistryClient,
schema_str: Union[str, Schema, None] = None,
Expand Down Expand Up @@ -286,6 +294,10 @@
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")

self._schema_id_serializer = conf_copy.pop('schema.id.serializer')
if not callable(self._schema_id_serializer):
raise ValueError("schema.id.serializer must be callable")

if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))
Expand Down Expand Up @@ -345,19 +357,20 @@
subject = self._subject_name_func(ctx, self._schema_name)
latest_schema = self._get_reader_schema(subject)
if latest_schema is not None:
self._schema_id = latest_schema.schema_id
self._schema_id = SchemaId(AVRO_TYPE, latest_schema.schema_id, latest_schema.guid)
elif subject not in self._known_subjects:
# Check to ensure this schema has been registered under subject_name.
if self._auto_register:
# The schema name will always be the same. We can't however register
# a schema without a subject so we set the schema_id here to handle
# the initial registration.
self._schema_id = self._registry.register_schema(
registered_schema = self._registry.register_schema_full_response(
subject, self._schema, self._normalize_schemas)
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
else:
registered_schema = self._registry.lookup_schema(
subject, self._schema, self._normalize_schemas)
self._schema_id = registered_schema.schema_id
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)

self._known_subjects.add(subject)

Expand All @@ -377,12 +390,9 @@
parsed_schema = self._parsed_schema

with _ContextStringIO() as fo:
# Write the magic byte and schema ID in network byte order (big endian)
fo.write(pack('>bI', _MAGIC_BYTE, self._schema_id))
# write the record to the rest of the buffer
schemaless_writer(fo, parsed_schema, value)

return fo.getvalue()
return self._schema_id_serializer(fo.getvalue(), ctx, self._schema_id)

def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)
Expand Down Expand Up @@ -425,6 +435,12 @@
| | | |
| | | Defaults to topic_subject_name_strategy. |
+-----------------------------+----------+--------------------------------------------------+
| | | Callable(bytes, SerializationContext, schema_id) |
| | | -> io.BytesIO |
| | | |
| ``schema.id.deserializer`` | callable | Defines how the schema id/guid is deserialized. |
| | | Defaults to dual_schema_id_deserializer. |
+-----------------------------+----------+--------------------------------------------------+

Note:
By default, Avro complex types are returned as dicts. This behavior can
Expand Down Expand Up @@ -462,9 +478,10 @@

_default_conf = {'use.latest.version': False,
'use.latest.with.metadata': None,
'subject.name.strategy': topic_subject_name_strategy}
'subject.name.strategy': topic_subject_name_strategy,
'schema.id.deserializer': dual_schema_id_deserializer}

def __init__(

Check failure on line 484 in src/confluent_kafka/schema_registry/avro.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/avro.py#L484

Refactor this function to reduce its Cognitive Complexity from 23 to the 15 allowed.
self,
schema_registry_client: SchemaRegistryClient,
schema_str: Union[str, Schema, None] = None,
Expand Down Expand Up @@ -507,6 +524,10 @@
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")

self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer')
if not callable(self._schema_id_deserializer):
raise ValueError("schema.id.deserializer must be callable")

if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))
Expand Down Expand Up @@ -551,67 +572,57 @@
if data is None:
return None

if len(data) <= 5:
raise SerializationError("Expecting data framing of length 6 bytes or "
"more but total data size is {} bytes. This "
"message was not produced with a Confluent "
"Schema Registry serializer".format(len(data)))

subject = self._subject_name_func(ctx, None) if ctx else None
latest_schema = None
if subject is not None:
latest_schema = self._get_reader_schema(subject)

with _ContextStringIO(data) as payload:
magic, schema_id = unpack('>bI', payload.read(5))
if magic != _MAGIC_BYTE:
raise SerializationError("Unexpected magic byte {}. This message "
"was not produced with a Confluent "
"Schema Registry serializer".format(magic))

writer_schema_raw = self._registry.get_schema(schema_id)
writer_schema = self._get_parsed_schema(writer_schema_raw)

if subject is None:
subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None
if subject is not None:
latest_schema = self._get_reader_schema(subject)

if latest_schema is not None:
migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None)
reader_schema_raw = latest_schema.schema
reader_schema = self._get_parsed_schema(latest_schema.schema)
elif self._schema is not None:
migrations = None
reader_schema_raw = self._schema
reader_schema = self._reader_schema
else:
migrations = None
reader_schema_raw = writer_schema_raw
reader_schema = writer_schema

if migrations:
obj_dict = schemaless_reader(payload,
writer_schema,
None,
self._return_record_name)
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
else:
obj_dict = schemaless_reader(payload,
writer_schema,
reader_schema,
self._return_record_name)
schema_id = SchemaId(AVRO_TYPE)
payload = self._schema_id_deserializer(data, ctx, schema_id)

writer_schema_raw = self._get_writer_schema(schema_id, subject)
writer_schema = self._get_parsed_schema(writer_schema_raw)

if subject is None:
subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None
if subject is not None:
latest_schema = self._get_reader_schema(subject)

if latest_schema is not None:
migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None)
reader_schema_raw = latest_schema.schema
reader_schema = self._get_parsed_schema(latest_schema.schema)
elif self._schema is not None:
migrations = None
reader_schema_raw = self._schema
reader_schema = self._reader_schema
else:
migrations = None
reader_schema_raw = writer_schema_raw
reader_schema = writer_schema

if migrations:
obj_dict = schemaless_reader(payload,
writer_schema,
None,
self._return_record_name)
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
else:
obj_dict = schemaless_reader(payload,
writer_schema,
reader_schema,
self._return_record_name)

field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731
transform(rule_ctx, reader_schema, message, field_transform))
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
field_transformer)
field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731
transform(rule_ctx, reader_schema, message, field_transform))
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
field_transformer)

if self._from_dict is not None:
return self._from_dict(obj_dict, ctx)
if self._from_dict is not None:
return self._from_dict(obj_dict, ctx)

return obj_dict
return obj_dict

def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)
Expand Down
Loading