[Major] Allow WebSocket* to be extensible

This commit is contained in:
Robert von Burg 2020-01-21 15:37:14 +01:00
parent 647a704c41
commit 4b2e8b9f81
3 changed files with 38 additions and 21 deletions

View File

@ -116,10 +116,14 @@ public class WebSocketClient implements MessageHandler.Whole<String> {
this.observerHandlersByRealm.computeIfAbsent(objectType, s -> { this.observerHandlersByRealm.computeIfAbsent(objectType, s -> {
ObserverHandler observerHandler = this.container.getRealm(realm).getObserverHandler(); ObserverHandler observerHandler = this.container.getRealm(realm).getObserverHandler();
return new WebSocketObserverHandler(observerHandler, this); return getWebSocketObserverHandler(observerHandler);
}).register(objectType, type, flat); }).register(objectType, type, flat);
} }
protected WebSocketObserverHandler getWebSocketObserverHandler(ObserverHandler observerHandler) {
return new WebSocketObserverHandler(observerHandler, this);
}
private void handleUnregister(JsonObject jsonObject) { private void handleUnregister(JsonObject jsonObject) {
String realm = jsonObject.get(Tags.Json.REALM).getAsString(); String realm = jsonObject.get(Tags.Json.REALM).getAsString();
String objectType = jsonObject.get(Tags.Json.OBJECT_TYPE).getAsString(); String objectType = jsonObject.get(Tags.Json.OBJECT_TYPE).getAsString();
@ -165,17 +169,17 @@ public class WebSocketClient implements MessageHandler.Whole<String> {
} }
private void close(CloseReason.CloseCode code, String reason) { private void close(CloseReason.CloseCode code, String reason) {
close(new CloseReason(code, reason));
}
public void close(CloseReason closeReason) {
if (this.observerHandlersByRealm == null || this.observerHandlersByRealm.isEmpty())
return;
try { try {
this.session.close(new CloseReason(code, reason)); this.session.close(new CloseReason(closeReason.getCloseCode(), closeReason.getReasonPhrase()));
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to close client after invalid authentication!", e); logger.error("Failed to close client after invalid authentication!", e);
} }
}
public void close() {
if (this.observerHandlersByRealm == null || this.observerHandlersByRealm.isEmpty())
return;
this.observerHandlersByRealm.keySet().forEach(realm -> { this.observerHandlersByRealm.keySet().forEach(realm -> {
this.observerHandlersByRealm.get(realm).unregisterAll(); this.observerHandlersByRealm.get(realm).unregisterAll();
}); });

View File

@ -24,7 +24,7 @@ public class WebSocketEndpoint {
public void onClose(Session session, CloseReason closeReason) { public void onClose(Session session, CloseReason closeReason) {
WebSocketClient webSocketClient = this.clientMap.remove(session); WebSocketClient webSocketClient = this.clientMap.remove(session);
if (webSocketClient != null) if (webSocketClient != null)
webSocketClient.close(); webSocketClient.close(closeReason);
} }
@OnError @OnError

View File

@ -17,14 +17,18 @@ import li.strolch.model.StrolchRootElement;
import li.strolch.model.Tags; import li.strolch.model.Tags;
import li.strolch.utils.collections.MapOfLists; import li.strolch.utils.collections.MapOfLists;
import li.strolch.utils.collections.MapOfSets; import li.strolch.utils.collections.MapOfSets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketObserverHandler implements Observer { public class WebSocketObserverHandler implements Observer {
private ObserverHandler observerHandler; protected static final Logger logger = LoggerFactory.getLogger(WebSocketObserverHandler.class);
private WebSocketClient client;
private MapOfSets<String, String> observedTypes; protected ObserverHandler observerHandler;
private Map<String, Boolean> asFlat; protected WebSocketClient client;
protected MapOfSets<String, String> observedTypes;
protected Map<String, Boolean> asFlat;
public WebSocketObserverHandler(ObserverHandler observerHandler, WebSocketClient client) { public WebSocketObserverHandler(ObserverHandler observerHandler, WebSocketClient client) {
this.observerHandler = observerHandler; this.observerHandler = observerHandler;
@ -67,18 +71,17 @@ public class WebSocketObserverHandler implements Observer {
handleUpdate("ObserverRemove", key, elements); handleUpdate("ObserverRemove", key, elements);
} }
private void handleUpdate(String updateType, String key, List<StrolchRootElement> elements) { protected void handleUpdate(String updateType, String key, List<StrolchRootElement> elements) {
Set<String> observedTypesSet = this.observedTypes.getSet(key); Set<String> observedTypesSet = this.observedTypes.getSet(key);
if (observedTypesSet == null) if (observedTypesSet == null)
return; return;
MapOfLists<String, JsonObject> data = elements.stream().filter(e -> observedTypesSet.contains(e.getType())) MapOfLists<String, JsonObject> data = elements //
.map(e -> { .stream() //
if (this.asFlat.get(e.getType())) .filter(e -> filter(observedTypesSet, e)) //
return e.toFlatJsonObject(); .map(this::toJson) //
else .collect(MapOfLists::new, //
return e.toJsonObject(); (mol, e) -> mol.addElement(e.get(Tags.Json.TYPE).getAsString(), e), //
}).collect(MapOfLists::new, (mol, e) -> mol.addElement(e.get(Tags.Json.TYPE).getAsString(), e),
MapOfLists::addAll); MapOfLists::addAll);
if (data.isEmpty()) if (data.isEmpty())
return; return;
@ -95,4 +98,14 @@ public class WebSocketObserverHandler implements Observer {
this.client.sendMessage(jsonObject.toString()); this.client.sendMessage(jsonObject.toString());
}); });
} }
protected boolean filter(Set<String> observedTypesSet, StrolchRootElement e) {
return observedTypesSet.contains(e.getType());
}
protected JsonObject toJson(StrolchRootElement e) {
if (this.asFlat.get(e.getType()))
return e.toFlatJsonObject();
return e.toJsonObject();
}
} }