Skip to content

Commit a2902b0

Browse files
committed
WIP: Jetty 12 changes
1 parent c502f53 commit a2902b0

File tree

7 files changed

+24
-22
lines changed

7 files changed

+24
-22
lines changed

activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import java.net.URI;
2222
import java.util.Map;
2323

24-
import jakarta.servlet.Servlet;
25-
2624
import org.apache.activemq.broker.BrokerService;
2725
import org.apache.activemq.broker.BrokerServiceAware;
2826
import org.apache.activemq.command.BrokerInfo;

activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public class MQTTWSConnection extends WebSocketAdapter implements AutoDemanding
5555

5656
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
5757

58-
private Session connection;
5958
private final CountDownLatch connectLatch = new CountDownLatch(1);
6059
private final MQTTWireFormat wireFormat = new MQTTWireFormat();
6160

@@ -67,17 +66,17 @@ public class MQTTWSConnection extends WebSocketAdapter implements AutoDemanding
6766

6867
@Override
6968
public boolean isConnected() {
70-
return connection != null ? connection.isOpen() : false;
69+
return getSession() != null ? getSession().isOpen() : false;
7170
}
7271

7372
public void close() {
74-
if (connection != null) {
75-
connection.close();
73+
if (getSession() != null) {
74+
getSession().close();
7675
}
7776
}
7877

7978
protected Session getConnection() {
80-
return connection;
79+
return getSession();
8180
}
8281

8382
//----- Connection and Disconnection methods -----------------------------//
@@ -193,7 +192,7 @@ public void onWebSocketBinary(byte[] data, int offset, int length) {
193192
frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(data, offset, length));
194193
} catch (IOException e) {
195194
LOG.error("Could not decode incoming MQTT Frame: {}", e.getMessage());
196-
connection.close();
195+
getSession().close();
197196
}
198197

199198
try {
@@ -243,23 +242,23 @@ public void onWebSocketBinary(byte[] data, int offset, int length) {
243242
break;
244243
default:
245244
LOG.error("Unknown MQTT Frame received.");
246-
connection.close();
245+
getSession().close();
247246
}
248247
} catch (Exception e) {
249248
LOG.error("Could not decode incoming MQTT Frame: {}", e.getMessage());
250-
connection.close();
249+
getSession().close();
251250
}
252251
}
253252

254253
//----- Internal implementation ------------------------------------------//
255254

256255
private void sendBytes(ByteSequence payload) throws IOException {
257256
if (!isWritePartialFrames()) {
258-
connection.getRemote().sendBytes(ByteBuffer.wrap(payload.data, payload.offset, payload.length));
257+
getRemote().sendBytes(ByteBuffer.wrap(payload.data, payload.offset, payload.length));
259258
} else {
260-
connection.getRemote().sendBytes(ByteBuffer.wrap(
259+
getRemote().sendBytes(ByteBuffer.wrap(
261260
payload.data, payload.offset, payload.length / 2));
262-
connection.getRemote().sendBytes(ByteBuffer.wrap(
261+
getRemote().sendBytes(ByteBuffer.wrap(
263262
payload.data, payload.offset + payload.length / 2, payload.length / 2));
264263
}
265264
}
@@ -274,16 +273,16 @@ private void checkConnected() throws IOException {
274273
public void onWebSocketClose(int statusCode, String reason) {
275274
LOG.trace("MQTT WS Connection closed, code:{} message:{}", statusCode, reason);
276275

277-
this.connection = null;
276+
getSession().close(statusCode, reason);
278277
this.closeCode = statusCode;
279278
this.closeMessage = reason;
280279

281280
}
282281

283282
@Override
284283
public void onWebSocketConnect(org.eclipse.jetty.ee9.websocket.api.Session session) {
285-
this.connection = session;
286-
this.connection.setIdleTimeout(Duration.ZERO);
284+
super.onWebSocketConnect(session);
285+
getSession().setIdleTimeout(Duration.ZERO);
287286
this.connectLatch.countDown();
288287
}
289288
}

activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
*/
1717
package org.apache.activemq.transport.ws;
1818

19+
import static org.junit.Assert.assertNotNull;
1920
import static org.junit.Assert.assertTrue;
2021
import static org.junit.Assert.fail;
2122

2223
import java.io.IOException;
2324
import java.util.Arrays;
2425
import java.util.Collection;
2526
import java.util.UUID;
27+
import java.util.concurrent.CompletableFuture;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.atomic.AtomicBoolean;
2830

@@ -31,6 +33,7 @@
3133
import org.eclipse.jetty.client.transport.HttpClientTransportDynamic;
3234
import org.eclipse.jetty.io.ClientConnector;
3335
import org.eclipse.jetty.util.ssl.SslContextFactory;
36+
import org.eclipse.jetty.websocket.api.Session;
3437
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
3538
import org.eclipse.jetty.websocket.client.WebSocketClient;
3639
import org.fusesource.hawtbuf.UTF8Buffer;
@@ -85,7 +88,9 @@ public void setUp() throws Exception {
8588

8689
wsMQTTConnection = new MQTTWSConnection().setWritePartialFrames(partialFrames);
8790

88-
wsClient.connect(wsMQTTConnection, wsConnectUri, request);
91+
CompletableFuture<Session> webSocketFuture = wsClient.connect(wsMQTTConnection, wsConnectUri, request);
92+
Session webSocketSession = webSocketFuture.get(30, TimeUnit.SECONDS);
93+
assertNotNull(webSocketSession);
8994
if (!wsMQTTConnection.awaitConnection(30, TimeUnit.SECONDS)) {
9095
throw new IOException("Could not connect to MQTT WS endpoint");
9196
}
@@ -110,7 +115,8 @@ public void testConnectCycles() throws Exception {
110115

111116
wsMQTTConnection = new MQTTWSConnection().setWritePartialFrames(partialFrames);
112117

113-
wsClient.connect(wsMQTTConnection, wsConnectUri, request);
118+
CompletableFuture<Session> wsClientFuture = wsClient.connect(wsMQTTConnection, wsConnectUri, request);
119+
wsClientFuture.get(30, TimeUnit.SECONDS);
114120
if (!wsMQTTConnection.awaitConnection(30, TimeUnit.SECONDS)) {
115121
throw new IOException("Could not connect to MQTT WS endpoint");
116122
}

activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportHttpTraceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.junit.runner.RunWith;
2626
import org.junit.runners.Parameterized;
2727

28-
@Ignore
28+
2929
@RunWith(Parameterized.class)
3030
public class WSSTransportHttpTraceTest extends WSTransportHttpTraceTest {
3131

activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportNeedClientAuthTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,12 @@ public void testMQTTNeedClientAuth() throws Exception {
145145
MQTTWSConnection wsMQTTConnection = new MQTTWSConnection();
146146

147147
wsClient.connect(wsMQTTConnection, new URI("wss://localhost:61618"), request);
148+
148149
if (!wsMQTTConnection.awaitConnection(30, TimeUnit.SECONDS)) {
149150
throw new IOException("Could not connect to MQTT WS endpoint");
150151
}
151152

152153
wsMQTTConnection.connect();
153-
154154
assertTrue("Client not connected", wsMQTTConnection.isConnected());
155155

156156
wsMQTTConnection.disconnect();

activemq-http/src/test/java/org/apache/activemq/transport/xstream/XStreamWireFormatTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import junit.framework.Test;
2222
import org.apache.activemq.command.ActiveMQTextMessage;
23-
import org.apache.activemq.command.Command;
2423
import org.apache.activemq.command.MessageTest;
2524
import org.apache.activemq.wireformat.WireFormat;
2625
import org.slf4j.Logger;

activemq-http/src/test/resources/spring-http.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<!-- lets wrap in a pool to avoid creating a connection per send -->
4545
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
4646
<property name="targetConnectionFactory">
47-
<ref local="jmsFactory" />
47+
<ref bean="jmsFactory" />
4848
</property>
4949
</bean>
5050
</property>

0 commit comments

Comments
 (0)