BlockBaseTemplate.java

/*
 * @copyright defined in LICENSE.txt
 */

package hera.client.internal;

import static hera.util.TransportUtils.copyFrom;
import static hera.util.ValidationUtils.assertTrue;
import static org.slf4j.LoggerFactory.getLogger;
import static types.AergoRPCServiceGrpc.newFutureStub;
import static types.AergoRPCServiceGrpc.newStub;

import hera.ContextProvider;
import hera.ContextProviderInjectable;
import hera.annotation.ApiAudience;
import hera.annotation.ApiStability;
import hera.api.function.Function1;
import hera.api.function.Function2;
import hera.api.model.Block;
import hera.api.model.BlockHash;
import hera.api.model.BlockMetadata;
import hera.api.model.Subscription;
import hera.client.ChannelInjectable;
import hera.client.stream.GrpcStreamObserverAdaptor;
import hera.client.stream.GrpcStreamSubscription;
import hera.transport.BlockConverterFactory;
import hera.transport.BlockMetadataConverterFactory;
import hera.transport.ModelConverter;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import lombok.Getter;
import org.slf4j.Logger;
import types.AergoRPCServiceGrpc.AergoRPCServiceFutureStub;
import types.AergoRPCServiceGrpc.AergoRPCServiceStub;
import types.Blockchain;
import types.Rpc;

@ApiAudience.Private
@ApiStability.Unstable
public class BlockBaseTemplate implements ChannelInjectable, ContextProviderInjectable {

  protected final transient Logger logger = getLogger(getClass());

  protected final ModelConverter<BlockMetadata, types.Rpc.BlockMetadata> blockMetadataConverter =
      new BlockMetadataConverterFactory().create();

  protected final ModelConverter<Block, Blockchain.Block> blockConverter =
      new BlockConverterFactory().create();

  protected AergoRPCServiceFutureStub aergoService;
  protected AergoRPCServiceStub streamService;

  protected ContextProvider contextProvider;

  @Override
  public void setChannel(final ManagedChannel channel) {
    this.aergoService = newFutureStub(channel);
    this.streamService = newStub(channel);
  }

  @Override
  public void setContextProvider(final ContextProvider contextProvider) {
    this.contextProvider = contextProvider;
  }

  @Getter
  private final Function1<BlockHash, Future<BlockMetadata>> blockMetatdataByHashFunction =
      new Function1<BlockHash, Future<BlockMetadata>>() {

        @Override
        public Future<BlockMetadata> apply(final BlockHash hash) {
          logger.debug("Get block metadata with hash: {}", hash);

          final Rpc.SingleBytes rpcHash = Rpc.SingleBytes.newBuilder()
              .setValue(copyFrom(hash.getBytesValue()))
              .build();
          logger.trace("AergoService getBlockMetadata arg: {}", rpcHash);

          final Future<Rpc.BlockMetadata> rawFuture = aergoService.getBlockMetadata(rpcHash);
          final Future<BlockMetadata> convertedFuture = HerajFutures.transform(rawFuture,
              new Function1<Rpc.BlockMetadata, BlockMetadata>() {

                @Override
                public BlockMetadata apply(final Rpc.BlockMetadata metadata) {
                  return blockMetadataConverter.convertToDomainModel(metadata);
                }
              });
          return convertedFuture;
        }
      };

  @Getter
  private final Function1<Long, Future<BlockMetadata>> blockMetadataByHeightFunction =
      new Function1<Long, Future<BlockMetadata>>() {

        @Override
        public Future<BlockMetadata> apply(final Long height) {
          logger.debug("Get block metadata with height: {}", height);
          assertTrue(height >= 0, "Height must >= 0");

          final Rpc.SingleBytes rpcHeight = Rpc.SingleBytes.newBuilder()
              .setValue(copyFrom(height.longValue()))
              .build();
          logger.trace("AergoService getBlockMetadata arg: {}", rpcHeight);

          final Future<Rpc.BlockMetadata> rawFuture = aergoService.getBlockMetadata(rpcHeight);
          final Future<BlockMetadata> convertedFuture =
              HerajFutures.transform(rawFuture, new Function1<Rpc.BlockMetadata, BlockMetadata>() {

                @Override
                public BlockMetadata apply(final Rpc.BlockMetadata metadata) {
                  return blockMetadataConverter.convertToDomainModel(metadata);
                }
              });
          return convertedFuture;
        }
      };

  @Getter
  private final Function2<BlockHash, Integer,
      Future<List<BlockMetadata>>> listBlockMetadatasByHashFunction = new Function2<
          BlockHash, Integer, Future<List<BlockMetadata>>>() {

        @Override
        public Future<List<BlockMetadata>> apply(final BlockHash hash,
            final Integer size) {
          logger.debug("List block meta datas with hash: {}, size: {}", hash, size);
          assertTrue(size > 0, "Block list size must be postive");

          final Rpc.ListParams rpcHashAndSize = Rpc.ListParams.newBuilder()
              .setHash(copyFrom(hash.getBytesValue()))
              .setSize(size)
              .build();
          logger.trace("AergoService listBlockMetadata arg: {}", rpcHashAndSize);

          final Future<Rpc.BlockMetadataList> rawFuture =
              aergoService.listBlockMetadata(rpcHashAndSize);
          final Future<List<BlockMetadata>> convertedFuture = HerajFutures.transform(rawFuture,
              new Function1<Rpc.BlockMetadataList, List<BlockMetadata>>() {

                @Override
                public List<BlockMetadata> apply(final Rpc.BlockMetadataList rpcMetadatas) {
                  final List<BlockMetadata> blockMetadatas = new ArrayList<>();
                  for (final Rpc.BlockMetadata rpcBlockMetadata : rpcMetadatas.getBlocksList()) {
                    blockMetadatas
                        .add(blockMetadataConverter.convertToDomainModel(rpcBlockMetadata));
                  }
                  return blockMetadatas;
                }
              });
          return convertedFuture;
        }
      };

  @Getter
  private final Function2<Long, Integer,
      Future<List<BlockMetadata>>> listBlockMetadatasByHeightFunction = new Function2<Long, Integer,
          Future<List<BlockMetadata>>>() {

        @Override
        public Future<List<BlockMetadata>> apply(final Long height,
            final Integer size) {
          logger.debug("List block meta datas with height: {}, size: {}", height, size);
          assertTrue(height >= 0, "Height must >= 0");
          assertTrue(size > 0, "Block list size must be postive");

          final Rpc.ListParams rpcHeightAndSize = Rpc.ListParams.newBuilder()
              .setHeight(height)
              .setSize(size)
              .build();
          logger.trace("AergoService listBlockMetadata arg: {}", rpcHeightAndSize);

          final Future<Rpc.BlockMetadataList> rawFuture =
              aergoService.listBlockMetadata(rpcHeightAndSize);
          final Future<List<BlockMetadata>> convertedFuture = HerajFutures.transform(rawFuture,
              new Function1<Rpc.BlockMetadataList, List<BlockMetadata>>() {
                @Override
                public List<BlockMetadata> apply(final Rpc.BlockMetadataList rpcMetadatas) {
                  final List<BlockMetadata> blockHeaders = new ArrayList<>();
                  for (final Rpc.BlockMetadata rpcBlockMetadata : rpcMetadatas
                      .getBlocksList()) {
                    blockHeaders
                        .add(blockMetadataConverter.convertToDomainModel(rpcBlockMetadata));
                  }
                  return blockHeaders;
                }
              });
          return convertedFuture;
        }
      };

  @Getter
  private final Function1<BlockHash, Future<Block>> blockByHashFunction =
      new Function1<BlockHash, Future<Block>>() {

        @Override
        public Future<Block> apply(final BlockHash hash) {
          logger.debug("Get block with hash: {}", hash);

          final Rpc.SingleBytes rpcHash = Rpc.SingleBytes.newBuilder()
              .setValue(copyFrom(hash.getBytesValue()))
              .build();
          logger.trace("AergoService getBlock arg: {}", rpcHash);

          final Future<Blockchain.Block> rawFuture = aergoService.getBlock(rpcHash);
          final Future<Block> convertedFuture = HerajFutures.transform(rawFuture,
              new Function1<Blockchain.Block, Block>() {
                @Override
                public Block apply(final Blockchain.Block block) {
                  return blockConverter.convertToDomainModel(block);
                }
              });
          return convertedFuture;
        }
      };

  @Getter
  private final Function1<Long, Future<Block>> blockByHeightFunction =
      new Function1<Long, Future<Block>>() {

        @Override
        public Future<Block> apply(final Long height) {
          logger.debug("Get block with height: {}", height);
          assertTrue(height >= 0, "Height must be >= 0");

          final Rpc.SingleBytes rpcHeight = Rpc.SingleBytes.newBuilder()
              .setValue(copyFrom(height.longValue()))
              .build();
          logger.trace("AergoService getBlock arg: {}", rpcHeight);

          final Future<Blockchain.Block> rawFuture = aergoService.getBlock(rpcHeight);
          final Future<Block> convertedFuture = HerajFutures.transform(rawFuture,
              new Function1<Blockchain.Block, Block>() {
                @Override
                public Block apply(final Blockchain.Block block) {
                  return blockConverter.convertToDomainModel(block);
                }
              });
          return convertedFuture;
        }
      };

  @Getter
  private final Function1<hera.api.model.StreamObserver<BlockMetadata>,
      Future<Subscription<BlockMetadata>>> subscribeBlockMetadataFunction = new Function1<
          hera.api.model.StreamObserver<BlockMetadata>,
          Future<Subscription<BlockMetadata>>>() {

        @Override
        public Future<Subscription<BlockMetadata>> apply(
            final hera.api.model.StreamObserver<BlockMetadata> observer) {

          logger.debug("Subscribe block metadata stream with observer {}", observer);

          final Rpc.Empty blockMetadataStreamRequest = Rpc.Empty.newBuilder().build();
          Context.CancellableContext cancellableContext = Context.current().withCancellation();
          final io.grpc.stub.StreamObserver<Rpc.BlockMetadata> adaptor =
              new GrpcStreamObserverAdaptor<Rpc.BlockMetadata, BlockMetadata>(cancellableContext,
                  observer, blockMetadataConverter);
          cancellableContext.run(new Runnable() {
            @Override
            public void run() {
              streamService.listBlockMetadataStream(blockMetadataStreamRequest, adaptor);
            }
          });

          final Subscription<BlockMetadata> subscription =
              new GrpcStreamSubscription<>(cancellableContext);
          return HerajFutures.success(subscription);
        }
      };

  @Getter
  private final Function1<hera.api.model.StreamObserver<Block>,
      Future<Subscription<Block>>> subscribeBlockFunction = new Function1<
          hera.api.model.StreamObserver<Block>, Future<Subscription<Block>>>() {

        @Override
        public Future<Subscription<Block>> apply(
            final hera.api.model.StreamObserver<Block> observer) {
          logger.debug("Subscribe block metadata stream with observer {}", observer);

          final Rpc.Empty blockStreamRequest = Rpc.Empty.newBuilder().build();
          Context.CancellableContext cancellableContext = Context.current().withCancellation();
          final io.grpc.stub.StreamObserver<Blockchain.Block> adaptor =
              new GrpcStreamObserverAdaptor<Blockchain.Block, Block>(cancellableContext,
                  observer, blockConverter);
          cancellableContext.run(new Runnable() {
            @Override
            public void run() {
              streamService.listBlockStream(blockStreamRequest, adaptor);
            }
          });

          final Subscription<Block> subscription = new GrpcStreamSubscription<>(cancellableContext);
          return HerajFutures.success(subscription);
        }
      };

}