NodeStatusConverterFactory.java
- /*
- * @copyright defined in LICENSE.txt
- */
- package hera.transport;
- import static org.slf4j.LoggerFactory.getLogger;
- import com.fasterxml.jackson.core.JsonParser;
- import com.fasterxml.jackson.core.ObjectCodec;
- import com.fasterxml.jackson.databind.DeserializationContext;
- import com.fasterxml.jackson.databind.JsonDeserializer;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.fasterxml.jackson.databind.module.SimpleModule;
- import hera.api.function.Function1;
- import hera.api.model.ModuleStatus;
- import hera.api.model.NodeStatus;
- import hera.api.model.internal.Time;
- import hera.exception.RpcException;
- import hera.util.ParsingUtils;
- import java.io.IOException;
- import java.text.ParseException;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
- import org.slf4j.Logger;
- import types.Rpc;
- public class NodeStatusConverterFactory {
- protected final transient Logger logger = getLogger(getClass());
- protected final ObjectMapper mapper = getObjectMapper();
- protected final Function1<NodeStatus, Rpc.SingleBytes> domainConverter =
- new Function1<NodeStatus, Rpc.SingleBytes>() {
- @Override
- public Rpc.SingleBytes apply(final NodeStatus domainNodeStatus) {
- throw new UnsupportedOperationException();
- }
- };
- protected final Function1<Rpc.SingleBytes, NodeStatus> rpcConverter =
- new Function1<Rpc.SingleBytes, NodeStatus>() {
- @Override
- public NodeStatus apply(final Rpc.SingleBytes rpcNodeStatus) {
- try {
- logger.trace("Rpc node status to convert: {}", rpcNodeStatus);
- final byte[] rawNodeStatus = rpcNodeStatus.getValue().toByteArray();
- final NodeStatus domainNodeStatus = rawNodeStatus.length == 0
- ? NodeStatus.newBuilder().moduleStatus(new ArrayList<ModuleStatus>()).build()
- : mapper.readValue(rawNodeStatus, NodeStatus.class);
- logger.trace("Domain node status converted: {}", domainNodeStatus);
- return domainNodeStatus;
- } catch (Throwable e) {
- throw new RpcException(e);
- }
- }
- };
- public ModelConverter<NodeStatus, Rpc.SingleBytes> create() {
- return new ModelConverter<NodeStatus, Rpc.SingleBytes>(domainConverter, rpcConverter);
- }
- protected ObjectMapper getObjectMapper() {
- ObjectMapper objectMapper = new ObjectMapper();
- SimpleModule simpleModule = new SimpleModule();
- simpleModule.addDeserializer(NodeStatus.class, new NodeStatusDeserializer());
- objectMapper.registerModule(simpleModule);
- return objectMapper;
- }
- private class NodeStatusDeserializer extends JsonDeserializer<NodeStatus> {
- @SuppressWarnings("unchecked")
- @Override
- public NodeStatus deserialize(JsonParser parser, DeserializationContext context)
- throws IOException {
- ObjectCodec objectCodec = parser.getCodec();
- JsonNode nodeStatusNode = objectCodec.readTree(parser);
- final List<ModuleStatus> moduleStatusList = new ArrayList<ModuleStatus>();
- final Iterator<String> it = nodeStatusNode.fieldNames();
- while (it.hasNext()) {
- final String moduleName = it.next();
- final JsonNode componentStatus = nodeStatusNode.get(moduleName);
- final Map<String, Object> actor =
- mapper.convertValue(componentStatus.get("actor"), Map.class);
- final ModuleStatus moduleStatus = ModuleStatus.newBuilder()
- .moduleName(moduleName)
- .status(componentStatus.get("status").asText())
- .processedMessageCount(componentStatus.get("acc_processed_msg").asLong())
- .queuedMessageCount(componentStatus.get("msg_queue_len").asLong())
- .latency(convertToTime(componentStatus.get("msg_latency").asText()))
- .error(componentStatus.get("error").asText())
- .actor(null == actor ? new HashMap<String, Object>() : actor)
- .build();
- moduleStatusList.add(moduleStatus);
- }
- return NodeStatus.newBuilder().moduleStatus(moduleStatusList).build();
- }
- }
- /**
- * Parse {@code val} and convert time in microseconds.
- *
- * @param val string to parse
- *
- * @return time in microseconds
- *
- * @throws IOException Fail to parse
- *
- * @see ParsingUtils#convertToTime(String)
- */
- protected Time convertToTime(final String val) throws IOException {
- if (null == val) {
- throw new IOException("Can't parse " + val);
- }
- try {
- long time = ParsingUtils.convertToTime(val);
- return Time.of(time, TimeUnit.MICROSECONDS);
- } catch (ParseException e) {
- throw new IOException("Can't parse " + val);
- }
- }
- }