From f212d46ba181bb1b82ff26b6b4dea0b50e7dd48f Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Tue, 7 Mar 2023 15:07:34 +0100 Subject: [PATCH] fix(api): fix connect schema mapping for map and array Resolved: #391 --- .../kafka/connect/filepulse/data/Schema.java | 2 +- .../source/internal/ConnectSchemaMapper.java | 17 +- .../internal/ConnectSchemaMapperTest.java | 166 +++++++++++++----- .../filepulse-google-cloud-storage-fs/pom.xml | 2 +- 4 files changed, 134 insertions(+), 53 deletions(-) diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/Schema.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/Schema.java index 0b230fcfa..507593d4d 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/Schema.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/Schema.java @@ -164,7 +164,7 @@ static ArraySchema array(final Collection value, final Schema valueSchema) { Type type() ; /** - * Checks whether this schemas is resolvable. + * Checks whether this schema is resolvable. * * @see LazyArraySchema * @see LazyMapSchema diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java index a53428811..988fe1481 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java @@ -297,8 +297,15 @@ private static Object toConnectObject(final Schema schema, final TypedValue type return typed.getMap().entrySet() .stream() .collect(Collectors.toMap( - e -> toConnectObject(connectKeySchema, TypedValue.of(e.getKey(), keySchema)), - e -> toConnectObject(connectValueSchema, TypedValue.of(e.getValue(), valueSchema)) + e -> { + TypedValue value = TypedValue.of(e.getKey(), keySchema); + return toConnectObject(connectKeySchema, value); + }, + e -> { + Object converted = valueSchema.type().convert(e.getValue()); + TypedValue elemValue = TypedValue.of(converted, valueSchema); + return toConnectObject(connectValueSchema, elemValue); + } ) ); } @@ -311,7 +318,11 @@ private static Object toConnectObject(final Schema schema, final TypedValue type return typed.getArray() .stream() - .map(e -> toConnectObject(connectValueSchema, TypedValue.of(e, valueSchema))) + .map(value -> { + Object converted = valueSchema.type().convert(value); + TypedValue elemValue = TypedValue.of(converted, valueSchema); + return toConnectObject(connectValueSchema, elemValue); + }) .collect(Collectors.toList()); } return typed.value(); diff --git a/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapperTest.java b/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapperTest.java index 2f1a16e8c..e2feae8fc 100644 --- a/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapperTest.java +++ b/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapperTest.java @@ -22,101 +22,120 @@ import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; import java.util.Map; -public class ConnectSchemaMapperTest { +class ConnectSchemaMapperTest { private ConnectSchemaMapper mapper; - @Before + @BeforeEach public void setUp() { mapper = new ConnectSchemaMapper(); } @Test public void should_map_given_simple_typed_struct() { - + // GIVEN TypedStruct struct = TypedStruct.create() .put("field1", "value1") .put("field2", "value2"); + // WHEN SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); - Assert.assertNotNull(schemaAndValue); + Assertions.assertNotNull(schemaAndValue); + // THEN Struct connectStruct = (Struct)schemaAndValue.value(); - Assert.assertEquals("value1", connectStruct.get("field1")); - Assert.assertEquals("value2", connectStruct.get("field2")); + Assertions.assertEquals("value1", connectStruct.get("field1")); + Assertions.assertEquals("value2", connectStruct.get("field2")); } @Test public void should_map_given_nested_typed_struct() { + // GIVEN TypedStruct struct = TypedStruct.create() .put("field1", TypedStruct.create().put("field2", "value2")); + // WHEN SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); - Assert.assertNotNull(schemaAndValue); + + // THEN + Assertions.assertNotNull(schemaAndValue); Struct connectStruct = (Struct)schemaAndValue.value(); Struct field1 = (Struct)connectStruct.get("field1"); - Assert.assertNotNull(field1); + Assertions.assertNotNull(field1); - Assert.assertEquals("value2", field1.get("field2")); + Assertions.assertEquals("value2", field1.get("field2")); } @Test @SuppressWarnings("unchecked") - public void should_map_given_type_struct_with_array_field() { + void should_map_given_type_struct_with_array_field() { + // GIVEN TypedStruct struct = TypedStruct.create() .put("field1", Collections.singletonList("value")); + // WHEN SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); - Assert.assertNotNull(schemaAndValue); + + // THEN + Assertions.assertNotNull(schemaAndValue); Struct connectStruct = (Struct)schemaAndValue.value(); List field1 = (List)connectStruct.get("field1"); - Assert.assertNotNull(field1); - Assert.assertEquals("value", field1.get(0)); + Assertions.assertNotNull(field1); + Assertions.assertEquals("value", field1.get(0)); } @Test @SuppressWarnings("unchecked") - public void should_Map_given_type_struct_with_array_of_struct() { + void should_map_given_type_struct_with_array_of_struct() { + // GIVEN TypedStruct struct = TypedStruct.create() .put("field1", Collections.singletonList(TypedStruct.create().put("field2", "value"))); + // WHEN SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); - Assert.assertNotNull(schemaAndValue); + + // THEN + Assertions.assertNotNull(schemaAndValue); Struct connectStruct = (Struct)schemaAndValue.value(); List field1 = (List)connectStruct.get("field1"); - Assert.assertNotNull(field1); - Assert.assertEquals("value", field1.get(0).getString("field2")); + Assertions.assertNotNull(field1); + Assertions.assertEquals("value", field1.get(0).getString("field2")); } @Test @SuppressWarnings("unchecked") - public void shouldMapGivenTypeStructWithMapField() { + void should_map_given_type_struct_with_map_field() { + // GIVEN TypedStruct struct = TypedStruct.create() .put("field1", Collections.singletonMap("field2", "value")); + // WHEN SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); - Assert.assertNotNull(schemaAndValue); + + // THEN + Assertions.assertNotNull(schemaAndValue); Struct connectStruct = (Struct)schemaAndValue.value(); Map field1 = (Map)connectStruct.get("field1"); - Assert.assertNotNull(field1); - Assert.assertEquals("value", field1.get("field2")); + Assertions.assertNotNull(field1); + Assertions.assertEquals("value", field1.get("field2")); } @Test @SuppressWarnings("unchecked") - public void should_map_given_type_struct_with_map_with_struct_value() { + void should_map_given_type_struct_with_map_with_struct_value() { + // GIVEN TypedStruct struct = TypedStruct.create() .put("field1", Collections.singletonMap( @@ -124,61 +143,77 @@ public void should_map_given_type_struct_with_map_with_struct_value() { TypedStruct.create().put("field3", "value") )); + // WHEN SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); - Assert.assertNotNull(schemaAndValue); + // THEN + Assertions.assertNotNull(schemaAndValue); Struct connectStruct = (Struct)schemaAndValue.value(); Map field1 = (Map)connectStruct.get("field1"); - Assert.assertNotNull(field1); - Assert.assertEquals("value", field1.get("field2").getString("field3")); + Assertions.assertNotNull(field1); + Assertions.assertEquals("value", field1.get("field2").getString("field3")); } @Test - public void should_map_given_type_struct_with_null_value() { + void should_map_given_type_struct_with_null_value() { + // GIVEN TypedStruct struct = TypedStruct.create() .put("field1", "value1") .put("field2", Schema.none(), null); + // WHEN SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); - Assert.assertNotNull(schemaAndValue); + // THEN + Assertions.assertNotNull(schemaAndValue); Struct connectStruct = (Struct) schemaAndValue.value(); - Assert.assertNotNull(connectStruct.schema().field("field1")); - Assert.assertNull(connectStruct.schema().field("field2")); + Assertions.assertNotNull(connectStruct.schema().field("field1")); + Assertions.assertNull(connectStruct.schema().field("field2")); } @Test - public void should_map_given_type_struct_with_empty_array() { + void should_map_given_type_struct_with_empty_array() { + // GIVEN TypedStruct struct = TypedStruct.create() .put("field1", "value1") .put("field2", Schema.array(Collections.emptyList(), null), Collections.emptyList()); + // WHEN SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); - Assert.assertNotNull(schemaAndValue); + // THEN + Assertions.assertNotNull(schemaAndValue); Struct connectStruct = (Struct)schemaAndValue.value(); - Assert.assertNotNull(connectStruct.schema().field("field1")); - Assert.assertNull(connectStruct.schema().field("field2")); + Assertions.assertNotNull(connectStruct.schema().field("field1")); + Assertions.assertNull(connectStruct.schema().field("field2")); } @Test - public void test_normalize_schema_name_given_leading_underscore_false() { + void test_normalize_schema_name_given_leading_underscore_false() { + // GIVEN var mapper = new ConnectSchemaMapper(); - Assert.assertEquals("Foo", mapper.normalizeSchemaName("foo")); - Assert.assertEquals("FooBar", mapper.normalizeSchemaName("foo_bar")); - Assert.assertEquals("FooBar", mapper.normalizeSchemaName("foo.bar")); - Assert.assertEquals("FooBar", mapper.normalizeSchemaName("__foo_bar")); + + // WHEN - THEN + Assertions.assertEquals("Foo", mapper.normalizeSchemaName("foo")); + Assertions.assertEquals("FooBar", mapper.normalizeSchemaName("foo_bar")); + Assertions.assertEquals("FooBar", mapper.normalizeSchemaName("foo.bar")); + Assertions.assertEquals("FooBar", mapper.normalizeSchemaName("__foo_bar")); } @Test - public void test_normalize_schema_name_given_leading_underscore_true() { + void test_normalize_schema_name_given_leading_underscore_true() { + // GIVEN var mapper = new ConnectSchemaMapper(); mapper.setKeepLeadingUnderscoreCharacters(true); - Assert.assertEquals("__FooBar", mapper.normalizeSchemaName("__foo_bar")); + + // WHEN - THEN + Assertions.assertEquals("__FooBar", mapper.normalizeSchemaName("__foo_bar")); } @Test - public void should_map_given_struct_with_duplicate_schema() { + void should_map_given_struct_with_duplicate_schema() { + // GIVEN + ConnectSchemaMapper mapper = new ConnectSchemaMapper(); final TypedStruct struct = TypedStruct.create() .put("field1", TypedStruct.create("Foo") .put("field1", "val") @@ -188,12 +223,47 @@ public void should_map_given_struct_with_duplicate_schema() { .put("field3", "val") ); - final org.apache.kafka.connect.data.Schema connectSchema = new ConnectSchemaMapper() - .map(struct.schema(), false); - Assert.assertEquals( + // WHEN + final org.apache.kafka.connect.data.Schema connectSchema = mapper.map(struct.schema(), false); + + // THEN + Assertions.assertEquals( connectSchema.field("field1").schema(), connectSchema.field("field2").schema() ); } -} \ No newline at end of file + @Test + void should_map_struct_with_map_containing_different_type_value() { + // GIVEN + final TypedStruct struct = TypedStruct.create() + .put("map", Map.of("k1", "string", "k2", 0L)); + + // WHEN + SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); + + // THEN + Assertions.assertNotNull(schemaAndValue); + + org.apache.kafka.connect.data.Schema mapSchema = schemaAndValue.schema().field("map").schema(); + Assertions.assertEquals(org.apache.kafka.connect.data.Schema.Type.MAP, mapSchema.schema().type()); + Assertions.assertEquals(org.apache.kafka.connect.data.Schema.Type.STRING, mapSchema.schema().valueSchema().type()); + } + + @Test + void should_map_struct_with_array_containing_different_type_value() { + // GIVEN + final TypedStruct struct = TypedStruct + .create() + .put("array", List.of("string", 0L)); + + // WHEN + SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false); + + // THEN + Assertions.assertNotNull(schemaAndValue); + org.apache.kafka.connect.data.Schema mapSchema = schemaAndValue.schema().field("array").schema(); + Assertions.assertEquals(org.apache.kafka.connect.data.Schema.Type.ARRAY, mapSchema.schema().type()); + Assertions.assertEquals(org.apache.kafka.connect.data.Schema.Type.STRING, mapSchema.schema().valueSchema().type()); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml index 1b996183a..dc985d9af 100644 --- a/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml @@ -56,7 +56,7 @@ com.google.cloud google-cloud-nio - 0.126.0 + 0.126.7 test