NodeStatusConverterFactory.java

  1. /*
  2.  * @copyright defined in LICENSE.txt
  3.  */

  4. package hera.transport;

  5. import static org.slf4j.LoggerFactory.getLogger;

  6. import com.fasterxml.jackson.core.JsonParser;
  7. import com.fasterxml.jackson.core.ObjectCodec;
  8. import com.fasterxml.jackson.databind.DeserializationContext;
  9. import com.fasterxml.jackson.databind.JsonDeserializer;
  10. import com.fasterxml.jackson.databind.JsonNode;
  11. import com.fasterxml.jackson.databind.ObjectMapper;
  12. import com.fasterxml.jackson.databind.module.SimpleModule;
  13. import hera.api.function.Function1;
  14. import hera.api.model.ModuleStatus;
  15. import hera.api.model.NodeStatus;
  16. import hera.api.model.internal.Time;
  17. import hera.exception.RpcException;
  18. import hera.util.ParsingUtils;
  19. import java.io.IOException;
  20. import java.text.ParseException;
  21. import java.util.ArrayList;
  22. import java.util.HashMap;
  23. import java.util.Iterator;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.concurrent.TimeUnit;
  27. import org.slf4j.Logger;
  28. import types.Rpc;

  29. public class NodeStatusConverterFactory {

  30.   protected final transient Logger logger = getLogger(getClass());

  31.   protected final ObjectMapper mapper = getObjectMapper();

  32.   protected final Function1<NodeStatus, Rpc.SingleBytes> domainConverter =
  33.       new Function1<NodeStatus, Rpc.SingleBytes>() {

  34.         @Override
  35.         public Rpc.SingleBytes apply(final NodeStatus domainNodeStatus) {
  36.           throw new UnsupportedOperationException();
  37.         }
  38.       };

  39.   protected final Function1<Rpc.SingleBytes, NodeStatus> rpcConverter =
  40.       new Function1<Rpc.SingleBytes, NodeStatus>() {

  41.         @Override
  42.         public NodeStatus apply(final Rpc.SingleBytes rpcNodeStatus) {
  43.           try {
  44.             logger.trace("Rpc node status to convert: {}", rpcNodeStatus);
  45.             final byte[] rawNodeStatus = rpcNodeStatus.getValue().toByteArray();
  46.             final NodeStatus domainNodeStatus = rawNodeStatus.length == 0
  47.                 ? NodeStatus.newBuilder().moduleStatus(new ArrayList<ModuleStatus>()).build()
  48.                 : mapper.readValue(rawNodeStatus, NodeStatus.class);
  49.             logger.trace("Domain node status converted: {}", domainNodeStatus);
  50.             return domainNodeStatus;
  51.           } catch (Throwable e) {
  52.             throw new RpcException(e);
  53.           }
  54.         }
  55.       };

  56.   public ModelConverter<NodeStatus, Rpc.SingleBytes> create() {
  57.     return new ModelConverter<NodeStatus, Rpc.SingleBytes>(domainConverter, rpcConverter);
  58.   }

  59.   protected ObjectMapper getObjectMapper() {
  60.     ObjectMapper objectMapper = new ObjectMapper();

  61.     SimpleModule simpleModule = new SimpleModule();
  62.     simpleModule.addDeserializer(NodeStatus.class, new NodeStatusDeserializer());
  63.     objectMapper.registerModule(simpleModule);

  64.     return objectMapper;
  65.   }

  66.   private class NodeStatusDeserializer extends JsonDeserializer<NodeStatus> {

  67.     @SuppressWarnings("unchecked")
  68.     @Override
  69.     public NodeStatus deserialize(JsonParser parser, DeserializationContext context)
  70.         throws IOException {
  71.       ObjectCodec objectCodec = parser.getCodec();
  72.       JsonNode nodeStatusNode = objectCodec.readTree(parser);

  73.       final List<ModuleStatus> moduleStatusList = new ArrayList<ModuleStatus>();
  74.       final Iterator<String> it = nodeStatusNode.fieldNames();
  75.       while (it.hasNext()) {
  76.         final String moduleName = it.next();
  77.         final JsonNode componentStatus = nodeStatusNode.get(moduleName);
  78.         final Map<String, Object> actor =
  79.             mapper.convertValue(componentStatus.get("actor"), Map.class);
  80.         final ModuleStatus moduleStatus = ModuleStatus.newBuilder()
  81.             .moduleName(moduleName)
  82.             .status(componentStatus.get("status").asText())
  83.             .processedMessageCount(componentStatus.get("acc_processed_msg").asLong())
  84.             .queuedMessageCount(componentStatus.get("msg_queue_len").asLong())
  85.             .latency(convertToTime(componentStatus.get("msg_latency").asText()))
  86.             .error(componentStatus.get("error").asText())
  87.             .actor(null == actor ? new HashMap<String, Object>() : actor)
  88.             .build();
  89.         moduleStatusList.add(moduleStatus);
  90.       }

  91.       return NodeStatus.newBuilder().moduleStatus(moduleStatusList).build();
  92.     }
  93.   }

  94.   /**
  95.    * Parse {@code val} and convert time in microseconds.
  96.    *
  97.    * @param val string to parse
  98.    *
  99.    * @return time in microseconds
  100.    *
  101.    * @throws IOException Fail to parse
  102.    *
  103.    * @see ParsingUtils#convertToTime(String)
  104.    */
  105.   protected Time convertToTime(final String val) throws IOException {
  106.     if (null == val) {
  107.       throw new IOException("Can't parse " + val);
  108.     }
  109.     try {
  110.       long time = ParsingUtils.convertToTime(val);
  111.       return Time.of(time, TimeUnit.MICROSECONDS);
  112.     } catch (ParseException e) {
  113.       throw new IOException("Can't parse " + val);
  114.     }
  115.   }

  116. }