GrpcStreamObserverAdaptor.java

/*
 * @copyright defined in LICENSE.txt
 */

package hera.client.stream;

import static org.slf4j.LoggerFactory.getLogger;

import hera.exception.RpcConnectionException;
import hera.exception.RpcException;
import hera.exception.RpcExceptionConverter;
import hera.transport.ModelConverter;
import hera.util.ExceptionConverter;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;

@RequiredArgsConstructor
public class GrpcStreamObserverAdaptor<RpcModelT, DomainModelT>
    implements io.grpc.stub.StreamObserver<RpcModelT> {

  protected final transient Logger logger = getLogger(getClass());

  protected final ExceptionConverter<RpcException> exceptionConverter = new RpcExceptionConverter();

  protected final io.grpc.Context.CancellableContext context;

  protected final hera.api.model.StreamObserver<DomainModelT> delegate;

  protected final ModelConverter<DomainModelT, RpcModelT> converter;

  @Override
  public void onNext(final RpcModelT value) {
    final DomainModelT converted = converter.convertToDomainModel(value);
    logger.info("Streaming next: {}", converted);
    delegate.onNext(converted);
  }

  @Override
  public void onError(final Throwable t) {
    final RpcException converted = exceptionConverter.convert(t);
    logger.error("Streaming failed by {}", converted.toString());
    if (converted instanceof RpcConnectionException) {
      logger.info("Stop subscription by connection error");
      context.cancel(converted);
    }
    delegate.onError(t);
  }

  @Override
  public void onCompleted() {
    logger.info("Streaming finished successfully");
    delegate.onCompleted();
  }

}