NodeStatusConverterFactory.java
/*
* @copyright defined in LICENSE.txt
*/
package hera.transport;
import static org.slf4j.LoggerFactory.getLogger;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import hera.api.function.Function1;
import hera.api.model.ModuleStatus;
import hera.api.model.NodeStatus;
import hera.api.model.internal.Time;
import hera.exception.RpcException;
import hera.util.ParsingUtils;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import types.Rpc;
public class NodeStatusConverterFactory {
protected final transient Logger logger = getLogger(getClass());
protected final ObjectMapper mapper = getObjectMapper();
protected final Function1<NodeStatus, Rpc.SingleBytes> domainConverter =
new Function1<NodeStatus, Rpc.SingleBytes>() {
@Override
public Rpc.SingleBytes apply(final NodeStatus domainNodeStatus) {
throw new UnsupportedOperationException();
}
};
protected final Function1<Rpc.SingleBytes, NodeStatus> rpcConverter =
new Function1<Rpc.SingleBytes, NodeStatus>() {
@Override
public NodeStatus apply(final Rpc.SingleBytes rpcNodeStatus) {
try {
logger.trace("Rpc node status to convert: {}", rpcNodeStatus);
final byte[] rawNodeStatus = rpcNodeStatus.getValue().toByteArray();
final NodeStatus domainNodeStatus = rawNodeStatus.length == 0
? NodeStatus.newBuilder().moduleStatus(new ArrayList<ModuleStatus>()).build()
: mapper.readValue(rawNodeStatus, NodeStatus.class);
logger.trace("Domain node status converted: {}", domainNodeStatus);
return domainNodeStatus;
} catch (Throwable e) {
throw new RpcException(e);
}
}
};
public ModelConverter<NodeStatus, Rpc.SingleBytes> create() {
return new ModelConverter<NodeStatus, Rpc.SingleBytes>(domainConverter, rpcConverter);
}
protected ObjectMapper getObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
SimpleModule simpleModule = new SimpleModule();
simpleModule.addDeserializer(NodeStatus.class, new NodeStatusDeserializer());
objectMapper.registerModule(simpleModule);
return objectMapper;
}
private class NodeStatusDeserializer extends JsonDeserializer<NodeStatus> {
@SuppressWarnings("unchecked")
@Override
public NodeStatus deserialize(JsonParser parser, DeserializationContext context)
throws IOException {
ObjectCodec objectCodec = parser.getCodec();
JsonNode nodeStatusNode = objectCodec.readTree(parser);
final List<ModuleStatus> moduleStatusList = new ArrayList<ModuleStatus>();
final Iterator<String> it = nodeStatusNode.fieldNames();
while (it.hasNext()) {
final String moduleName = it.next();
final JsonNode componentStatus = nodeStatusNode.get(moduleName);
final Map<String, Object> actor =
mapper.convertValue(componentStatus.get("actor"), Map.class);
final ModuleStatus moduleStatus = ModuleStatus.newBuilder()
.moduleName(moduleName)
.status(componentStatus.get("status").asText())
.processedMessageCount(componentStatus.get("acc_processed_msg").asLong())
.queuedMessageCount(componentStatus.get("msg_queue_len").asLong())
.latency(convertToTime(componentStatus.get("msg_latency").asText()))
.error(componentStatus.get("error").asText())
.actor(null == actor ? new HashMap<String, Object>() : actor)
.build();
moduleStatusList.add(moduleStatus);
}
return NodeStatus.newBuilder().moduleStatus(moduleStatusList).build();
}
}
/**
* Parse {@code val} and convert time in microseconds.
*
* @param val string to parse
*
* @return time in microseconds
*
* @throws IOException Fail to parse
*
* @see ParsingUtils#convertToTime(String)
*/
protected Time convertToTime(final String val) throws IOException {
if (null == val) {
throw new IOException("Can't parse " + val);
}
try {
long time = ParsingUtils.convertToTime(val);
return Time.of(time, TimeUnit.MICROSECONDS);
} catch (ParseException e) {
throw new IOException("Can't parse " + val);
}
}
}