Skip to content

fix(api): fix connect schema mapping for map and array #397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static ArraySchema array(final Collection<?> value, final Schema valueSchema) {
Type type() ;

/**
* Checks whether this schemas is resolvable.
* Checks whether this schema is resolvable.
*
* @see LazyArraySchema
* @see LazyMapSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,15 @@ private static Object toConnectObject(final Schema schema, final TypedValue type
return typed.getMap().entrySet()
.stream()
.collect(Collectors.toMap(
e -> toConnectObject(connectKeySchema, TypedValue.of(e.getKey(), keySchema)),
e -> toConnectObject(connectValueSchema, TypedValue.of(e.getValue(), valueSchema))
e -> {
TypedValue value = TypedValue.of(e.getKey(), keySchema);
return toConnectObject(connectKeySchema, value);
},
e -> {
Object converted = valueSchema.type().convert(e.getValue());
TypedValue elemValue = TypedValue.of(converted, valueSchema);
return toConnectObject(connectValueSchema, elemValue);
}
)
);
}
Expand All @@ -311,7 +318,11 @@ private static Object toConnectObject(final Schema schema, final TypedValue type

return typed.getArray()
.stream()
.map(e -> toConnectObject(connectValueSchema, TypedValue.of(e, valueSchema)))
.map(value -> {
Object converted = valueSchema.type().convert(value);
TypedValue elemValue = TypedValue.of(converted, valueSchema);
return toConnectObject(connectValueSchema, elemValue);
})
.collect(Collectors.toList());
}
return typed.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,163 +22,198 @@
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class ConnectSchemaMapperTest {
class ConnectSchemaMapperTest {

private ConnectSchemaMapper mapper;

@Before
@BeforeEach
public void setUp() {
mapper = new ConnectSchemaMapper();
}

@Test
public void should_map_given_simple_typed_struct() {

// GIVEN
TypedStruct struct = TypedStruct.create()
.put("field1", "value1")
.put("field2", "value2");

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);
Assert.assertNotNull(schemaAndValue);
Assertions.assertNotNull(schemaAndValue);

// THEN
Struct connectStruct = (Struct)schemaAndValue.value();
Assert.assertEquals("value1", connectStruct.get("field1"));
Assert.assertEquals("value2", connectStruct.get("field2"));
Assertions.assertEquals("value1", connectStruct.get("field1"));
Assertions.assertEquals("value2", connectStruct.get("field2"));
}

@Test
public void should_map_given_nested_typed_struct() {
// GIVEN
TypedStruct struct = TypedStruct.create()
.put("field1", TypedStruct.create().put("field2", "value2"));

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);
Assert.assertNotNull(schemaAndValue);

// THEN
Assertions.assertNotNull(schemaAndValue);

Struct connectStruct = (Struct)schemaAndValue.value();
Struct field1 = (Struct)connectStruct.get("field1");
Assert.assertNotNull(field1);
Assertions.assertNotNull(field1);

Assert.assertEquals("value2", field1.get("field2"));
Assertions.assertEquals("value2", field1.get("field2"));
}

@Test
@SuppressWarnings("unchecked")
public void should_map_given_type_struct_with_array_field() {
void should_map_given_type_struct_with_array_field() {
// GIVEN
TypedStruct struct = TypedStruct.create()
.put("field1", Collections.singletonList("value"));

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);
Assert.assertNotNull(schemaAndValue);

// THEN
Assertions.assertNotNull(schemaAndValue);

Struct connectStruct = (Struct)schemaAndValue.value();
List<String> field1 = (List<String>)connectStruct.get("field1");
Assert.assertNotNull(field1);
Assert.assertEquals("value", field1.get(0));
Assertions.assertNotNull(field1);
Assertions.assertEquals("value", field1.get(0));
}

@Test
@SuppressWarnings("unchecked")
public void should_Map_given_type_struct_with_array_of_struct() {
void should_map_given_type_struct_with_array_of_struct() {
// GIVEN
TypedStruct struct = TypedStruct.create()
.put("field1", Collections.singletonList(TypedStruct.create().put("field2", "value")));

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);
Assert.assertNotNull(schemaAndValue);

// THEN
Assertions.assertNotNull(schemaAndValue);

Struct connectStruct = (Struct)schemaAndValue.value();
List<Struct> field1 = (List<Struct>)connectStruct.get("field1");
Assert.assertNotNull(field1);
Assert.assertEquals("value", field1.get(0).getString("field2"));
Assertions.assertNotNull(field1);
Assertions.assertEquals("value", field1.get(0).getString("field2"));
}

@Test
@SuppressWarnings("unchecked")
public void shouldMapGivenTypeStructWithMapField() {
void should_map_given_type_struct_with_map_field() {
// GIVEN
TypedStruct struct = TypedStruct.create()
.put("field1", Collections.singletonMap("field2", "value"));

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);
Assert.assertNotNull(schemaAndValue);

// THEN
Assertions.assertNotNull(schemaAndValue);

Struct connectStruct = (Struct)schemaAndValue.value();
Map<String, String> field1 = (Map<String, String>)connectStruct.get("field1");
Assert.assertNotNull(field1);
Assert.assertEquals("value", field1.get("field2"));
Assertions.assertNotNull(field1);
Assertions.assertEquals("value", field1.get("field2"));
}

@Test
@SuppressWarnings("unchecked")
public void should_map_given_type_struct_with_map_with_struct_value() {
void should_map_given_type_struct_with_map_with_struct_value() {
// GIVEN
TypedStruct struct = TypedStruct.create()
.put("field1",
Collections.singletonMap(
"field2",
TypedStruct.create().put("field3", "value")
));

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);
Assert.assertNotNull(schemaAndValue);

// THEN
Assertions.assertNotNull(schemaAndValue);
Struct connectStruct = (Struct)schemaAndValue.value();
Map<String, Struct> field1 = (Map<String, Struct>)connectStruct.get("field1");
Assert.assertNotNull(field1);
Assert.assertEquals("value", field1.get("field2").getString("field3"));
Assertions.assertNotNull(field1);
Assertions.assertEquals("value", field1.get("field2").getString("field3"));
}

@Test
public void should_map_given_type_struct_with_null_value() {
void should_map_given_type_struct_with_null_value() {
// GIVEN
TypedStruct struct = TypedStruct.create()
.put("field1", "value1")
.put("field2", Schema.none(), null);

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);
Assert.assertNotNull(schemaAndValue);

// THEN
Assertions.assertNotNull(schemaAndValue);
Struct connectStruct = (Struct) schemaAndValue.value();
Assert.assertNotNull(connectStruct.schema().field("field1"));
Assert.assertNull(connectStruct.schema().field("field2"));
Assertions.assertNotNull(connectStruct.schema().field("field1"));
Assertions.assertNull(connectStruct.schema().field("field2"));
}

@Test
public void should_map_given_type_struct_with_empty_array() {
void should_map_given_type_struct_with_empty_array() {
// GIVEN
TypedStruct struct = TypedStruct.create()
.put("field1", "value1")
.put("field2", Schema.array(Collections.emptyList(), null), Collections.emptyList());

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);
Assert.assertNotNull(schemaAndValue);

// THEN
Assertions.assertNotNull(schemaAndValue);
Struct connectStruct = (Struct)schemaAndValue.value();
Assert.assertNotNull(connectStruct.schema().field("field1"));
Assert.assertNull(connectStruct.schema().field("field2"));
Assertions.assertNotNull(connectStruct.schema().field("field1"));
Assertions.assertNull(connectStruct.schema().field("field2"));
}

@Test
public void test_normalize_schema_name_given_leading_underscore_false() {
void test_normalize_schema_name_given_leading_underscore_false() {
// GIVEN
var mapper = new ConnectSchemaMapper();
Assert.assertEquals("Foo", mapper.normalizeSchemaName("foo"));
Assert.assertEquals("FooBar", mapper.normalizeSchemaName("foo_bar"));
Assert.assertEquals("FooBar", mapper.normalizeSchemaName("foo.bar"));
Assert.assertEquals("FooBar", mapper.normalizeSchemaName("__foo_bar"));

// WHEN - THEN
Assertions.assertEquals("Foo", mapper.normalizeSchemaName("foo"));
Assertions.assertEquals("FooBar", mapper.normalizeSchemaName("foo_bar"));
Assertions.assertEquals("FooBar", mapper.normalizeSchemaName("foo.bar"));
Assertions.assertEquals("FooBar", mapper.normalizeSchemaName("__foo_bar"));
}

@Test
public void test_normalize_schema_name_given_leading_underscore_true() {
void test_normalize_schema_name_given_leading_underscore_true() {
// GIVEN
var mapper = new ConnectSchemaMapper();
mapper.setKeepLeadingUnderscoreCharacters(true);
Assert.assertEquals("__FooBar", mapper.normalizeSchemaName("__foo_bar"));

// WHEN - THEN
Assertions.assertEquals("__FooBar", mapper.normalizeSchemaName("__foo_bar"));
}

@Test
public void should_map_given_struct_with_duplicate_schema() {
void should_map_given_struct_with_duplicate_schema() {
// GIVEN
ConnectSchemaMapper mapper = new ConnectSchemaMapper();
final TypedStruct struct = TypedStruct.create()
.put("field1", TypedStruct.create("Foo")
.put("field1", "val")
Expand All @@ -188,12 +223,47 @@ public void should_map_given_struct_with_duplicate_schema() {
.put("field3", "val")
);

final org.apache.kafka.connect.data.Schema connectSchema = new ConnectSchemaMapper()
.map(struct.schema(), false);
Assert.assertEquals(
// WHEN
final org.apache.kafka.connect.data.Schema connectSchema = mapper.map(struct.schema(), false);

// THEN
Assertions.assertEquals(
connectSchema.field("field1").schema(),
connectSchema.field("field2").schema()
);
}

}
@Test
void should_map_struct_with_map_containing_different_type_value() {
// GIVEN
final TypedStruct struct = TypedStruct.create()
.put("map", Map.of("k1", "string", "k2", 0L));

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);

// THEN
Assertions.assertNotNull(schemaAndValue);

org.apache.kafka.connect.data.Schema mapSchema = schemaAndValue.schema().field("map").schema();
Assertions.assertEquals(org.apache.kafka.connect.data.Schema.Type.MAP, mapSchema.schema().type());
Assertions.assertEquals(org.apache.kafka.connect.data.Schema.Type.STRING, mapSchema.schema().valueSchema().type());
}

@Test
void should_map_struct_with_array_containing_different_type_value() {
// GIVEN
final TypedStruct struct = TypedStruct
.create()
.put("array", List.of("string", 0L));

// WHEN
SchemaAndValue schemaAndValue = struct.schema().map(mapper, struct, false);

// THEN
Assertions.assertNotNull(schemaAndValue);
org.apache.kafka.connect.data.Schema mapSchema = schemaAndValue.schema().field("array").schema();
Assertions.assertEquals(org.apache.kafka.connect.data.Schema.Type.ARRAY, mapSchema.schema().type());
Assertions.assertEquals(org.apache.kafka.connect.data.Schema.Type.STRING, mapSchema.schema().valueSchema().type());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-nio</artifactId>
<version>0.126.0</version>
<version>0.126.7</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down