BlockBaseTemplate.java

  1. /*
  2.  * @copyright defined in LICENSE.txt
  3.  */

  4. package hera.client.internal;

  5. import static hera.util.TransportUtils.copyFrom;
  6. import static hera.util.ValidationUtils.assertTrue;
  7. import static org.slf4j.LoggerFactory.getLogger;
  8. import static types.AergoRPCServiceGrpc.newFutureStub;
  9. import static types.AergoRPCServiceGrpc.newStub;

  10. import hera.ContextProvider;
  11. import hera.ContextProviderInjectable;
  12. import hera.annotation.ApiAudience;
  13. import hera.annotation.ApiStability;
  14. import hera.api.function.Function1;
  15. import hera.api.function.Function2;
  16. import hera.api.model.Block;
  17. import hera.api.model.BlockHash;
  18. import hera.api.model.BlockMetadata;
  19. import hera.api.model.Subscription;
  20. import hera.client.ChannelInjectable;
  21. import hera.client.stream.GrpcStreamObserverAdaptor;
  22. import hera.client.stream.GrpcStreamSubscription;
  23. import hera.transport.BlockConverterFactory;
  24. import hera.transport.BlockMetadataConverterFactory;
  25. import hera.transport.ModelConverter;
  26. import io.grpc.Context;
  27. import io.grpc.ManagedChannel;
  28. import java.util.ArrayList;
  29. import java.util.List;
  30. import java.util.concurrent.Future;
  31. import lombok.Getter;
  32. import org.slf4j.Logger;
  33. import types.AergoRPCServiceGrpc.AergoRPCServiceFutureStub;
  34. import types.AergoRPCServiceGrpc.AergoRPCServiceStub;
  35. import types.Blockchain;
  36. import types.Rpc;

  37. @ApiAudience.Private
  38. @ApiStability.Unstable
  39. public class BlockBaseTemplate implements ChannelInjectable, ContextProviderInjectable {

  40.   protected final transient Logger logger = getLogger(getClass());

  41.   protected final ModelConverter<BlockMetadata, types.Rpc.BlockMetadata> blockMetadataConverter =
  42.       new BlockMetadataConverterFactory().create();

  43.   protected final ModelConverter<Block, Blockchain.Block> blockConverter =
  44.       new BlockConverterFactory().create();

  45.   protected AergoRPCServiceFutureStub aergoService;
  46.   protected AergoRPCServiceStub streamService;

  47.   protected ContextProvider contextProvider;

  48.   @Override
  49.   public void setChannel(final ManagedChannel channel) {
  50.     this.aergoService = newFutureStub(channel);
  51.     this.streamService = newStub(channel);
  52.   }

  53.   @Override
  54.   public void setContextProvider(final ContextProvider contextProvider) {
  55.     this.contextProvider = contextProvider;
  56.   }

  57.   @Getter
  58.   private final Function1<BlockHash, Future<BlockMetadata>> blockMetatdataByHashFunction =
  59.       new Function1<BlockHash, Future<BlockMetadata>>() {

  60.         @Override
  61.         public Future<BlockMetadata> apply(final BlockHash hash) {
  62.           logger.debug("Get block metadata with hash: {}", hash);

  63.           final Rpc.SingleBytes rpcHash = Rpc.SingleBytes.newBuilder()
  64.               .setValue(copyFrom(hash.getBytesValue()))
  65.               .build();
  66.           logger.trace("AergoService getBlockMetadata arg: {}", rpcHash);

  67.           final Future<Rpc.BlockMetadata> rawFuture = aergoService.getBlockMetadata(rpcHash);
  68.           final Future<BlockMetadata> convertedFuture = HerajFutures.transform(rawFuture,
  69.               new Function1<Rpc.BlockMetadata, BlockMetadata>() {

  70.                 @Override
  71.                 public BlockMetadata apply(final Rpc.BlockMetadata metadata) {
  72.                   return blockMetadataConverter.convertToDomainModel(metadata);
  73.                 }
  74.               });
  75.           return convertedFuture;
  76.         }
  77.       };

  78.   @Getter
  79.   private final Function1<Long, Future<BlockMetadata>> blockMetadataByHeightFunction =
  80.       new Function1<Long, Future<BlockMetadata>>() {

  81.         @Override
  82.         public Future<BlockMetadata> apply(final Long height) {
  83.           logger.debug("Get block metadata with height: {}", height);
  84.           assertTrue(height >= 0, "Height must >= 0");

  85.           final Rpc.SingleBytes rpcHeight = Rpc.SingleBytes.newBuilder()
  86.               .setValue(copyFrom(height.longValue()))
  87.               .build();
  88.           logger.trace("AergoService getBlockMetadata arg: {}", rpcHeight);

  89.           final Future<Rpc.BlockMetadata> rawFuture = aergoService.getBlockMetadata(rpcHeight);
  90.           final Future<BlockMetadata> convertedFuture =
  91.               HerajFutures.transform(rawFuture, new Function1<Rpc.BlockMetadata, BlockMetadata>() {

  92.                 @Override
  93.                 public BlockMetadata apply(final Rpc.BlockMetadata metadata) {
  94.                   return blockMetadataConverter.convertToDomainModel(metadata);
  95.                 }
  96.               });
  97.           return convertedFuture;
  98.         }
  99.       };

  100.   @Getter
  101.   private final Function2<BlockHash, Integer,
  102.       Future<List<BlockMetadata>>> listBlockMetadatasByHashFunction = new Function2<
  103.           BlockHash, Integer, Future<List<BlockMetadata>>>() {

  104.         @Override
  105.         public Future<List<BlockMetadata>> apply(final BlockHash hash,
  106.             final Integer size) {
  107.           logger.debug("List block meta datas with hash: {}, size: {}", hash, size);
  108.           assertTrue(size > 0, "Block list size must be postive");

  109.           final Rpc.ListParams rpcHashAndSize = Rpc.ListParams.newBuilder()
  110.               .setHash(copyFrom(hash.getBytesValue()))
  111.               .setSize(size)
  112.               .build();
  113.           logger.trace("AergoService listBlockMetadata arg: {}", rpcHashAndSize);

  114.           final Future<Rpc.BlockMetadataList> rawFuture =
  115.               aergoService.listBlockMetadata(rpcHashAndSize);
  116.           final Future<List<BlockMetadata>> convertedFuture = HerajFutures.transform(rawFuture,
  117.               new Function1<Rpc.BlockMetadataList, List<BlockMetadata>>() {

  118.                 @Override
  119.                 public List<BlockMetadata> apply(final Rpc.BlockMetadataList rpcMetadatas) {
  120.                   final List<BlockMetadata> blockMetadatas = new ArrayList<>();
  121.                   for (final Rpc.BlockMetadata rpcBlockMetadata : rpcMetadatas.getBlocksList()) {
  122.                     blockMetadatas
  123.                         .add(blockMetadataConverter.convertToDomainModel(rpcBlockMetadata));
  124.                   }
  125.                   return blockMetadatas;
  126.                 }
  127.               });
  128.           return convertedFuture;
  129.         }
  130.       };

  131.   @Getter
  132.   private final Function2<Long, Integer,
  133.       Future<List<BlockMetadata>>> listBlockMetadatasByHeightFunction = new Function2<Long, Integer,
  134.           Future<List<BlockMetadata>>>() {

  135.         @Override
  136.         public Future<List<BlockMetadata>> apply(final Long height,
  137.             final Integer size) {
  138.           logger.debug("List block meta datas with height: {}, size: {}", height, size);
  139.           assertTrue(height >= 0, "Height must >= 0");
  140.           assertTrue(size > 0, "Block list size must be postive");

  141.           final Rpc.ListParams rpcHeightAndSize = Rpc.ListParams.newBuilder()
  142.               .setHeight(height)
  143.               .setSize(size)
  144.               .build();
  145.           logger.trace("AergoService listBlockMetadata arg: {}", rpcHeightAndSize);

  146.           final Future<Rpc.BlockMetadataList> rawFuture =
  147.               aergoService.listBlockMetadata(rpcHeightAndSize);
  148.           final Future<List<BlockMetadata>> convertedFuture = HerajFutures.transform(rawFuture,
  149.               new Function1<Rpc.BlockMetadataList, List<BlockMetadata>>() {
  150.                 @Override
  151.                 public List<BlockMetadata> apply(final Rpc.BlockMetadataList rpcMetadatas) {
  152.                   final List<BlockMetadata> blockHeaders = new ArrayList<>();
  153.                   for (final Rpc.BlockMetadata rpcBlockMetadata : rpcMetadatas
  154.                       .getBlocksList()) {
  155.                     blockHeaders
  156.                         .add(blockMetadataConverter.convertToDomainModel(rpcBlockMetadata));
  157.                   }
  158.                   return blockHeaders;
  159.                 }
  160.               });
  161.           return convertedFuture;
  162.         }
  163.       };

  164.   @Getter
  165.   private final Function1<BlockHash, Future<Block>> blockByHashFunction =
  166.       new Function1<BlockHash, Future<Block>>() {

  167.         @Override
  168.         public Future<Block> apply(final BlockHash hash) {
  169.           logger.debug("Get block with hash: {}", hash);

  170.           final Rpc.SingleBytes rpcHash = Rpc.SingleBytes.newBuilder()
  171.               .setValue(copyFrom(hash.getBytesValue()))
  172.               .build();
  173.           logger.trace("AergoService getBlock arg: {}", rpcHash);

  174.           final Future<Blockchain.Block> rawFuture = aergoService.getBlock(rpcHash);
  175.           final Future<Block> convertedFuture = HerajFutures.transform(rawFuture,
  176.               new Function1<Blockchain.Block, Block>() {
  177.                 @Override
  178.                 public Block apply(final Blockchain.Block block) {
  179.                   return blockConverter.convertToDomainModel(block);
  180.                 }
  181.               });
  182.           return convertedFuture;
  183.         }
  184.       };

  185.   @Getter
  186.   private final Function1<Long, Future<Block>> blockByHeightFunction =
  187.       new Function1<Long, Future<Block>>() {

  188.         @Override
  189.         public Future<Block> apply(final Long height) {
  190.           logger.debug("Get block with height: {}", height);
  191.           assertTrue(height >= 0, "Height must be >= 0");

  192.           final Rpc.SingleBytes rpcHeight = Rpc.SingleBytes.newBuilder()
  193.               .setValue(copyFrom(height.longValue()))
  194.               .build();
  195.           logger.trace("AergoService getBlock arg: {}", rpcHeight);

  196.           final Future<Blockchain.Block> rawFuture = aergoService.getBlock(rpcHeight);
  197.           final Future<Block> convertedFuture = HerajFutures.transform(rawFuture,
  198.               new Function1<Blockchain.Block, Block>() {
  199.                 @Override
  200.                 public Block apply(final Blockchain.Block block) {
  201.                   return blockConverter.convertToDomainModel(block);
  202.                 }
  203.               });
  204.           return convertedFuture;
  205.         }
  206.       };

  207.   @Getter
  208.   private final Function1<hera.api.model.StreamObserver<BlockMetadata>,
  209.       Future<Subscription<BlockMetadata>>> subscribeBlockMetadataFunction = new Function1<
  210.           hera.api.model.StreamObserver<BlockMetadata>,
  211.           Future<Subscription<BlockMetadata>>>() {

  212.         @Override
  213.         public Future<Subscription<BlockMetadata>> apply(
  214.             final hera.api.model.StreamObserver<BlockMetadata> observer) {

  215.           logger.debug("Subscribe block metadata stream with observer {}", observer);

  216.           final Rpc.Empty blockMetadataStreamRequest = Rpc.Empty.newBuilder().build();
  217.           Context.CancellableContext cancellableContext = Context.current().withCancellation();
  218.           final io.grpc.stub.StreamObserver<Rpc.BlockMetadata> adaptor =
  219.               new GrpcStreamObserverAdaptor<Rpc.BlockMetadata, BlockMetadata>(cancellableContext,
  220.                   observer, blockMetadataConverter);
  221.           cancellableContext.run(new Runnable() {
  222.             @Override
  223.             public void run() {
  224.               streamService.listBlockMetadataStream(blockMetadataStreamRequest, adaptor);
  225.             }
  226.           });

  227.           final Subscription<BlockMetadata> subscription =
  228.               new GrpcStreamSubscription<>(cancellableContext);
  229.           return HerajFutures.success(subscription);
  230.         }
  231.       };

  232.   @Getter
  233.   private final Function1<hera.api.model.StreamObserver<Block>,
  234.       Future<Subscription<Block>>> subscribeBlockFunction = new Function1<
  235.           hera.api.model.StreamObserver<Block>, Future<Subscription<Block>>>() {

  236.         @Override
  237.         public Future<Subscription<Block>> apply(
  238.             final hera.api.model.StreamObserver<Block> observer) {
  239.           logger.debug("Subscribe block metadata stream with observer {}", observer);

  240.           final Rpc.Empty blockStreamRequest = Rpc.Empty.newBuilder().build();
  241.           Context.CancellableContext cancellableContext = Context.current().withCancellation();
  242.           final io.grpc.stub.StreamObserver<Blockchain.Block> adaptor =
  243.               new GrpcStreamObserverAdaptor<Blockchain.Block, Block>(cancellableContext,
  244.                   observer, blockConverter);
  245.           cancellableContext.run(new Runnable() {
  246.             @Override
  247.             public void run() {
  248.               streamService.listBlockStream(blockStreamRequest, adaptor);
  249.             }
  250.           });

  251.           final Subscription<Block> subscription = new GrpcStreamSubscription<>(cancellableContext);
  252.           return HerajFutures.success(subscription);
  253.         }
  254.       };

  255. }