Skip to content

Commit d60e574

Browse files
committed
fix(api): connector should throw an error when merging struct with string
1 parent 6f6decc commit d60e574

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/schema/SchemaMerger.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public static Schema merge(final Schema left, final Schema right, final SchemaCo
7777
return left;
7878
}
7979

80-
if (left.type() == Type.STRING || right.type() == Type.STRING)
80+
if ( (left.type() == Type.STRING && right.type().isPrimitive()) ||
81+
(right.type() == Type.STRING && left.type().isPrimitive()) )
8182
return mergeMetadata(left, right, SchemaBuilder.string());
8283

8384
if ( (left.type() == Type.INT64 && right.type() == Type.INT32) ||

connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/schema/SchemaMergerTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.schema;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.data.DataException;
2122
import org.apache.kafka.connect.data.Schema;
2223
import org.apache.kafka.connect.data.SchemaBuilder;
2324
import org.junit.Assert;
@@ -168,4 +169,15 @@ public void should_success_merge_given_duplicate_schemas() {
168169
schema.field(DEFAULT_FIELD_C).schema().field("field3").schema()
169170
);
170171
}
172+
173+
@Test(expected = DataException.class)
174+
public void should_throw_error_when_merging_struct_given_string() {
175+
// Given
176+
Schema schemaLeft = SchemaBuilder.struct()
177+
.field(DEFAULT_FIELD_A, SchemaBuilder.array(SchemaBuilder.string()))
178+
.build();
179+
180+
// When
181+
SchemaMerger.merge(schemaLeft, SchemaBuilder.string());
182+
}
171183
}

0 commit comments

Comments
 (0)