Skip to content

Commit 4fc2cb9

Browse files
committed
feat(expression): add expression function TimestampDiff
1 parent d76bac0 commit 4fc2cb9

File tree

3 files changed

+197
-0
lines changed

3 files changed

+197
-0
lines changed

connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.streamthoughts.kafka.connect.filepulse.expression.function.conditions.If;
2828
import io.streamthoughts.kafka.connect.filepulse.expression.function.conditions.LessThan;
2929
import io.streamthoughts.kafka.connect.filepulse.expression.function.conditions.Or;
30+
import io.streamthoughts.kafka.connect.filepulse.expression.function.datetime.TimestampDiff;
3031
import io.streamthoughts.kafka.connect.filepulse.expression.function.datetime.ToTimestamp;
3132
import io.streamthoughts.kafka.connect.filepulse.expression.function.datetime.UnixTimestamp;
3233
import io.streamthoughts.kafka.connect.filepulse.expression.function.objects.Converts;
@@ -95,6 +96,7 @@ private ExpressionFunctionExecutors() {
9596
register(new Split());
9697
register(new UnixTimestamp());
9798
register(new ToTimestamp());
99+
register(new TimestampDiff());
98100
register(new If());
99101
register(new And());
100102
register(new Or());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.expression.function.datetime;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting;
22+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
23+
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
24+
import io.streamthoughts.kafka.connect.filepulse.expression.ExpressionException;
25+
import io.streamthoughts.kafka.connect.filepulse.expression.ValueExpression;
26+
import io.streamthoughts.kafka.connect.filepulse.expression.function.Arguments;
27+
import io.streamthoughts.kafka.connect.filepulse.expression.function.ExecutionContext;
28+
import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunction;
29+
30+
import java.time.Instant;
31+
import java.time.temporal.ChronoUnit;
32+
33+
/**
34+
* An {@link ExpressionFunction} to compute the time difference between two epoch-time in milliseconds.
35+
*/
36+
public class TimestampDiff implements ExpressionFunction {
37+
38+
/**
39+
* {@inheritDoc}
40+
*/
41+
@Override
42+
public Instance get() {
43+
return new TimestampDiffInstance();
44+
}
45+
46+
public static class TimestampDiffInstance implements ExpressionFunction.Instance {
47+
48+
private static final String CHRONO_UNIT_ARG = "unit";
49+
private static final String EPOCH_MILLI_1_ARG = "epoch_millis_expr1";
50+
private static final String EPOCH_MILLI_2_ARG = "epoch_millis_expr2";
51+
52+
private ChronoUnit chronoUnit;
53+
54+
/**
55+
* {@inheritDoc}
56+
*/
57+
@Override
58+
public Arguments prepare(final Expression[] args) throws ExpressionException {
59+
final String unit = ((ValueExpression) args[0]).value().getString();
60+
chronoUnit = ChronoUnit.valueOf(unit.toUpperCase());
61+
62+
return Arguments.of(
63+
CHRONO_UNIT_ARG, args[0],
64+
EPOCH_MILLI_1_ARG, args[1],
65+
EPOCH_MILLI_2_ARG, args[2]
66+
);
67+
}
68+
69+
public TypedValue invoke(final ExecutionContext context) throws ExpressionException {
70+
final Long epochTimeLeft = context.get(1).getLong();
71+
final Long epochTimeRight = context.get(2).getLong();
72+
73+
long between = Math.abs(chronoUnit.between(toInstant(epochTimeLeft), toInstant(epochTimeRight)));
74+
75+
// We need to remove milliseconds precision when one of the two given epoch-timestamps
76+
// is in seconds but result unit is in milliseconds.
77+
if (chronoUnit == ChronoUnit.MILLIS &&
78+
!(isEpochTimeInMillis(epochTimeLeft) && isEpochTimeInMillis(epochTimeRight)) ) {
79+
between = between < 1000 ? 0 : (between / 1000) * 1000;
80+
}
81+
82+
return TypedValue.int64(between);
83+
}
84+
85+
/**
86+
* Converts a given epoch time in seconds or milliseconds to {@link Instant}.
87+
*
88+
* @param epochTime the epoch-time to convert.
89+
* @return a new {@link Instant}
90+
*/
91+
@VisibleForTesting
92+
static Instant toInstant(final long epochTime) {
93+
94+
if (isEpochTimeInMillis(epochTime)) {
95+
return Instant.ofEpochMilli(epochTime);
96+
} else {
97+
return Instant.ofEpochSecond(epochTime);
98+
}
99+
}
100+
101+
private static boolean isEpochTimeInMillis(long epochTime) {
102+
// If the given epoch-time has more than 11 digits then
103+
// it seems to be safe to assume that this timestamp is in milliseconds
104+
// i.e a the epoch-time represents a date after the Sat Mar 03 1973 09:46:39, otherwise, is in seconds.
105+
return String.valueOf(epochTime).length() > 11;
106+
}
107+
}
108+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.expression.function.datetime;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
22+
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
23+
import io.streamthoughts.kafka.connect.filepulse.expression.StandardEvaluationContext;
24+
import org.junit.Assert;
25+
import org.junit.Test;
26+
27+
import java.time.Instant;
28+
import java.time.temporal.ChronoUnit;
29+
30+
import static io.streamthoughts.kafka.connect.filepulse.expression.parser.ExpressionParsers.parseExpression;
31+
32+
public class TimestampDiffTest {
33+
34+
@Test
35+
public void should_get_diff_in_seconds_given_epoch_time_in_millis() {
36+
// Given
37+
38+
final Instant now = Instant.now();
39+
final Instant minus = now.minusSeconds(10);
40+
Expression expression = buildExpressionFor(now.toEpochMilli(), minus.toEpochMilli(), ChronoUnit.SECONDS);
41+
StandardEvaluationContext context = new StandardEvaluationContext(new Object());
42+
43+
// When
44+
TypedValue result = expression.readValue(context, TypedValue.class);
45+
46+
// THEN
47+
Assert.assertEquals(10, result.getLong().longValue());
48+
}
49+
50+
@Test
51+
public void should_get_diff_in_millis_given_epoch_time_in_millis() {
52+
// Given
53+
54+
final Instant now = Instant.now();
55+
final Instant minus = now.minusSeconds(10);
56+
Expression expression = buildExpressionFor(now.toEpochMilli(), minus.toEpochMilli(), ChronoUnit.MILLIS);
57+
StandardEvaluationContext context = new StandardEvaluationContext(new Object());
58+
59+
// When
60+
TypedValue result = expression.readValue(context, TypedValue.class);
61+
62+
// THEN
63+
Assert.assertEquals(10_000, result.getLong().longValue());
64+
}
65+
66+
@Test
67+
public void should_get_diff_in_millis_given_epoch_time_in_seconds() {
68+
// Given
69+
70+
final Instant now = Instant.now();
71+
final Instant minus = now.minusSeconds(10);
72+
Expression expression = buildExpressionFor(now.toEpochMilli(), minus.getEpochSecond(), ChronoUnit.MILLIS);
73+
StandardEvaluationContext context = new StandardEvaluationContext(new Object());
74+
75+
// When
76+
TypedValue result = expression.readValue(context, TypedValue.class);
77+
78+
// THEN
79+
Assert.assertEquals(10_000, result.getLong().longValue());
80+
}
81+
82+
public Expression buildExpressionFor(final Long epochTimeLeft,
83+
final Long epochTimeRight,
84+
final ChronoUnit unit) {
85+
return parseExpression("{{ timestamp_diff('" + unit + "', " + epochTimeLeft + ", " + epochTimeRight + ") }}");
86+
}
87+
}

0 commit comments

Comments
 (0)