BlockBaseTemplate.java
- /*
- * @copyright defined in LICENSE.txt
- */
- package hera.client.internal;
- import static hera.util.TransportUtils.copyFrom;
- import static hera.util.ValidationUtils.assertTrue;
- import static org.slf4j.LoggerFactory.getLogger;
- import static types.AergoRPCServiceGrpc.newFutureStub;
- import static types.AergoRPCServiceGrpc.newStub;
- import hera.ContextProvider;
- import hera.ContextProviderInjectable;
- import hera.annotation.ApiAudience;
- import hera.annotation.ApiStability;
- import hera.api.function.Function1;
- import hera.api.function.Function2;
- import hera.api.model.Block;
- import hera.api.model.BlockHash;
- import hera.api.model.BlockMetadata;
- import hera.api.model.Subscription;
- import hera.client.ChannelInjectable;
- import hera.client.stream.GrpcStreamObserverAdaptor;
- import hera.client.stream.GrpcStreamSubscription;
- import hera.transport.BlockConverterFactory;
- import hera.transport.BlockMetadataConverterFactory;
- import hera.transport.ModelConverter;
- import io.grpc.Context;
- import io.grpc.ManagedChannel;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.Future;
- import lombok.Getter;
- import org.slf4j.Logger;
- import types.AergoRPCServiceGrpc.AergoRPCServiceFutureStub;
- import types.AergoRPCServiceGrpc.AergoRPCServiceStub;
- import types.Blockchain;
- import types.Rpc;
- @ApiAudience.Private
- @ApiStability.Unstable
- public class BlockBaseTemplate implements ChannelInjectable, ContextProviderInjectable {
- protected final transient Logger logger = getLogger(getClass());
- protected final ModelConverter<BlockMetadata, types.Rpc.BlockMetadata> blockMetadataConverter =
- new BlockMetadataConverterFactory().create();
- protected final ModelConverter<Block, Blockchain.Block> blockConverter =
- new BlockConverterFactory().create();
- protected AergoRPCServiceFutureStub aergoService;
- protected AergoRPCServiceStub streamService;
- protected ContextProvider contextProvider;
- @Override
- public void setChannel(final ManagedChannel channel) {
- this.aergoService = newFutureStub(channel);
- this.streamService = newStub(channel);
- }
- @Override
- public void setContextProvider(final ContextProvider contextProvider) {
- this.contextProvider = contextProvider;
- }
- @Getter
- private final Function1<BlockHash, Future<BlockMetadata>> blockMetatdataByHashFunction =
- new Function1<BlockHash, Future<BlockMetadata>>() {
- @Override
- public Future<BlockMetadata> apply(final BlockHash hash) {
- logger.debug("Get block metadata with hash: {}", hash);
- final Rpc.SingleBytes rpcHash = Rpc.SingleBytes.newBuilder()
- .setValue(copyFrom(hash.getBytesValue()))
- .build();
- logger.trace("AergoService getBlockMetadata arg: {}", rpcHash);
- final Future<Rpc.BlockMetadata> rawFuture = aergoService.getBlockMetadata(rpcHash);
- final Future<BlockMetadata> convertedFuture = HerajFutures.transform(rawFuture,
- new Function1<Rpc.BlockMetadata, BlockMetadata>() {
- @Override
- public BlockMetadata apply(final Rpc.BlockMetadata metadata) {
- return blockMetadataConverter.convertToDomainModel(metadata);
- }
- });
- return convertedFuture;
- }
- };
- @Getter
- private final Function1<Long, Future<BlockMetadata>> blockMetadataByHeightFunction =
- new Function1<Long, Future<BlockMetadata>>() {
- @Override
- public Future<BlockMetadata> apply(final Long height) {
- logger.debug("Get block metadata with height: {}", height);
- assertTrue(height >= 0, "Height must >= 0");
- final Rpc.SingleBytes rpcHeight = Rpc.SingleBytes.newBuilder()
- .setValue(copyFrom(height.longValue()))
- .build();
- logger.trace("AergoService getBlockMetadata arg: {}", rpcHeight);
- final Future<Rpc.BlockMetadata> rawFuture = aergoService.getBlockMetadata(rpcHeight);
- final Future<BlockMetadata> convertedFuture =
- HerajFutures.transform(rawFuture, new Function1<Rpc.BlockMetadata, BlockMetadata>() {
- @Override
- public BlockMetadata apply(final Rpc.BlockMetadata metadata) {
- return blockMetadataConverter.convertToDomainModel(metadata);
- }
- });
- return convertedFuture;
- }
- };
- @Getter
- private final Function2<BlockHash, Integer,
- Future<List<BlockMetadata>>> listBlockMetadatasByHashFunction = new Function2<
- BlockHash, Integer, Future<List<BlockMetadata>>>() {
- @Override
- public Future<List<BlockMetadata>> apply(final BlockHash hash,
- final Integer size) {
- logger.debug("List block meta datas with hash: {}, size: {}", hash, size);
- assertTrue(size > 0, "Block list size must be postive");
- final Rpc.ListParams rpcHashAndSize = Rpc.ListParams.newBuilder()
- .setHash(copyFrom(hash.getBytesValue()))
- .setSize(size)
- .build();
- logger.trace("AergoService listBlockMetadata arg: {}", rpcHashAndSize);
- final Future<Rpc.BlockMetadataList> rawFuture =
- aergoService.listBlockMetadata(rpcHashAndSize);
- final Future<List<BlockMetadata>> convertedFuture = HerajFutures.transform(rawFuture,
- new Function1<Rpc.BlockMetadataList, List<BlockMetadata>>() {
- @Override
- public List<BlockMetadata> apply(final Rpc.BlockMetadataList rpcMetadatas) {
- final List<BlockMetadata> blockMetadatas = new ArrayList<>();
- for (final Rpc.BlockMetadata rpcBlockMetadata : rpcMetadatas.getBlocksList()) {
- blockMetadatas
- .add(blockMetadataConverter.convertToDomainModel(rpcBlockMetadata));
- }
- return blockMetadatas;
- }
- });
- return convertedFuture;
- }
- };
- @Getter
- private final Function2<Long, Integer,
- Future<List<BlockMetadata>>> listBlockMetadatasByHeightFunction = new Function2<Long, Integer,
- Future<List<BlockMetadata>>>() {
- @Override
- public Future<List<BlockMetadata>> apply(final Long height,
- final Integer size) {
- logger.debug("List block meta datas with height: {}, size: {}", height, size);
- assertTrue(height >= 0, "Height must >= 0");
- assertTrue(size > 0, "Block list size must be postive");
- final Rpc.ListParams rpcHeightAndSize = Rpc.ListParams.newBuilder()
- .setHeight(height)
- .setSize(size)
- .build();
- logger.trace("AergoService listBlockMetadata arg: {}", rpcHeightAndSize);
- final Future<Rpc.BlockMetadataList> rawFuture =
- aergoService.listBlockMetadata(rpcHeightAndSize);
- final Future<List<BlockMetadata>> convertedFuture = HerajFutures.transform(rawFuture,
- new Function1<Rpc.BlockMetadataList, List<BlockMetadata>>() {
- @Override
- public List<BlockMetadata> apply(final Rpc.BlockMetadataList rpcMetadatas) {
- final List<BlockMetadata> blockHeaders = new ArrayList<>();
- for (final Rpc.BlockMetadata rpcBlockMetadata : rpcMetadatas
- .getBlocksList()) {
- blockHeaders
- .add(blockMetadataConverter.convertToDomainModel(rpcBlockMetadata));
- }
- return blockHeaders;
- }
- });
- return convertedFuture;
- }
- };
- @Getter
- private final Function1<BlockHash, Future<Block>> blockByHashFunction =
- new Function1<BlockHash, Future<Block>>() {
- @Override
- public Future<Block> apply(final BlockHash hash) {
- logger.debug("Get block with hash: {}", hash);
- final Rpc.SingleBytes rpcHash = Rpc.SingleBytes.newBuilder()
- .setValue(copyFrom(hash.getBytesValue()))
- .build();
- logger.trace("AergoService getBlock arg: {}", rpcHash);
- final Future<Blockchain.Block> rawFuture = aergoService.getBlock(rpcHash);
- final Future<Block> convertedFuture = HerajFutures.transform(rawFuture,
- new Function1<Blockchain.Block, Block>() {
- @Override
- public Block apply(final Blockchain.Block block) {
- return blockConverter.convertToDomainModel(block);
- }
- });
- return convertedFuture;
- }
- };
- @Getter
- private final Function1<Long, Future<Block>> blockByHeightFunction =
- new Function1<Long, Future<Block>>() {
- @Override
- public Future<Block> apply(final Long height) {
- logger.debug("Get block with height: {}", height);
- assertTrue(height >= 0, "Height must be >= 0");
- final Rpc.SingleBytes rpcHeight = Rpc.SingleBytes.newBuilder()
- .setValue(copyFrom(height.longValue()))
- .build();
- logger.trace("AergoService getBlock arg: {}", rpcHeight);
- final Future<Blockchain.Block> rawFuture = aergoService.getBlock(rpcHeight);
- final Future<Block> convertedFuture = HerajFutures.transform(rawFuture,
- new Function1<Blockchain.Block, Block>() {
- @Override
- public Block apply(final Blockchain.Block block) {
- return blockConverter.convertToDomainModel(block);
- }
- });
- return convertedFuture;
- }
- };
- @Getter
- private final Function1<hera.api.model.StreamObserver<BlockMetadata>,
- Future<Subscription<BlockMetadata>>> subscribeBlockMetadataFunction = new Function1<
- hera.api.model.StreamObserver<BlockMetadata>,
- Future<Subscription<BlockMetadata>>>() {
- @Override
- public Future<Subscription<BlockMetadata>> apply(
- final hera.api.model.StreamObserver<BlockMetadata> observer) {
- logger.debug("Subscribe block metadata stream with observer {}", observer);
- final Rpc.Empty blockMetadataStreamRequest = Rpc.Empty.newBuilder().build();
- Context.CancellableContext cancellableContext = Context.current().withCancellation();
- final io.grpc.stub.StreamObserver<Rpc.BlockMetadata> adaptor =
- new GrpcStreamObserverAdaptor<Rpc.BlockMetadata, BlockMetadata>(cancellableContext,
- observer, blockMetadataConverter);
- cancellableContext.run(new Runnable() {
- @Override
- public void run() {
- streamService.listBlockMetadataStream(blockMetadataStreamRequest, adaptor);
- }
- });
- final Subscription<BlockMetadata> subscription =
- new GrpcStreamSubscription<>(cancellableContext);
- return HerajFutures.success(subscription);
- }
- };
- @Getter
- private final Function1<hera.api.model.StreamObserver<Block>,
- Future<Subscription<Block>>> subscribeBlockFunction = new Function1<
- hera.api.model.StreamObserver<Block>, Future<Subscription<Block>>>() {
- @Override
- public Future<Subscription<Block>> apply(
- final hera.api.model.StreamObserver<Block> observer) {
- logger.debug("Subscribe block metadata stream with observer {}", observer);
- final Rpc.Empty blockStreamRequest = Rpc.Empty.newBuilder().build();
- Context.CancellableContext cancellableContext = Context.current().withCancellation();
- final io.grpc.stub.StreamObserver<Blockchain.Block> adaptor =
- new GrpcStreamObserverAdaptor<Blockchain.Block, Block>(cancellableContext,
- observer, blockConverter);
- cancellableContext.run(new Runnable() {
- @Override
- public void run() {
- streamService.listBlockStream(blockStreamRequest, adaptor);
- }
- });
- final Subscription<Block> subscription = new GrpcStreamSubscription<>(cancellableContext);
- return HerajFutures.success(subscription);
- }
- };
- }