- by example

Protobuf

Protobuf is an Interface Description Language (IDL). It defines the messages and services for gRPC. For all languages that gRPC supports classes will be generated from the protocol buffers defined in .proto files, by the protobuf compiler protoc.

            
              message SomeRequestObject {
                required int32 x = 1;
                required int32 y = 2;
                optional string label = 3;
              }
            
          

Read more about Protobuf here.

Service

A service is a serverside component for a that resolves a specific task. It is defined in protobuf like this:

            
              service SomeService {
                  rpc someUnaryMethod(SomeRequestObject) returns (SomeResponseObject);
                  rpc someServerStreamingMethod(SomeRequestObject) returns (stream SomeResponseObject);
              }
            
          

Protoc create generated classes for the services from protobuf, and they will need to be extended and implemented for each language, like so:

            
              public class SomeService extends SomeServiceGrpc.SomeServiceImplBase {
                  @Override
                  public void someServerStreamingMethod(final SomeRequestObject request, final StreamObserver responseObserver) {
                      // Stream messages to the client
                      while(someCondition) {
                        responseObserver.onNext(
                            SomeResponseObject.newBuilder()
                                    .setSomeString("Hello world!")
                                    .setSomeInt(42)
                                    .setSomeBoolean(true)
                                    …
                                    .build();
                            );
                      }
                      // Close the stream
                      responseObserver.onCompleted();
                  }
              }
            
          

Stub

Protoc also generates the clients for each language, they are not abstract, as they will need no logic to function, so they can be used as is:

            
              SomeServiceFutureStub stub = SomeServiceGrpc.newFutureStub(channel);
              ListenableFuture futureResponse = stub.someUnaryMethod(someProtoGeneratedRequestObject);
            
          
            
              SomeServiceBlockingStub stub = SomeServiceGrpc.newBlockingStub(channel);
              Iterator stub.someServerStreamMethod(someProtoGeneratedRequestObject);
            
          

Channel

A channel is a connection to a gRPC server. Multiple stubs and multiple services can share one channel. A channel has state, like idle and connected and a set of configurations, like encryption.

            
              Channel channel = ManagedChannelBuilder
               .forAddress("localhost", 8080)
               .usePlaintext(true)
               .build();
            
          

Interceptors

An interceptor is a mechanism that can monitor, rewrite, and retry calls. An interceptor can either intercept a call on the client side or on the server side.

            
              public class SomeInterceptor extends ServerInterceptor {
                  @Override
                  public  ServerCall.Listener interceptCall(ServerCall call, Metadata headersFromClient, ServerCallHandler next) {
                    // Some interception logic here: I.e. monitor, rewrite, or retry calls
                  }
              }
            
          

Context

            
              public static final Context.Key REQUEST_ID = Context.key("REQUEST_ID");
            
          
            
              Context context = Context.current().withValue(ApiUtil.REQUEST_ID, requestId);
              Context previous = context.attach();
              try {
                  // do stuff inside context…
              } finally {
                  context.detach(previous);
              }
            
          
            
              …
              } catch (IOExeption e)
                  String requestId = ApiUtil.REQUEST_ID.get();
                  throw ("Request id: " + requestId, e);
              }
            
          

Metadata

            
              // Define metadata key
                public static final Metadata.Key META_REQUEST_ID = Metadata.Key.of("REQUEST_ID", Metadata.ASCII_STRING_MARSHALLER);
            
          
            
                // Send headers to server
                Metadata extraHeaders = new Metadata();
                extraHeaders.put(ApiUtil.META_REQUEST_ID, "api-request-1");
                stub = stub.withInterceptor(MetadataUtils.newAttachHeadersInterceptor(extraHeaders));
                …
            
          
            
              // Receive headers from client, and send server headers
              ServerServiceDefinition anInterceptedSomeService = ServerInterceptors.intercept(new SomeService(), new ServerInterceptor() {
                  @Override
                  public  ServerCall.Listener interceptCall(ServerCall call, Metadata headersFromClient, ServerCallHandler next) {
                      String requestId = headersFromClient.get(ApiUtil.META_REQUEST_ID);
                      Metadata headersFromServer = new Metadata();
                      headersFromServer.put(ApiUtil.META_SERVER_NAME, "Node1");
                      call.sendHeaders(headersFromServer);
                      return next.startCall(call, headersFromClient);
                  }
              });
            
          
            
              // Receive headers and trailers from server
              …
              AtomicReference headersCapture = new AtomicReference<>();
              AtomicReference trailersCapture = new AtomicReference<>();
              stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
            
          
            
              // Send trailers to client
              ServerServiceDefinition anInterceptedSomeService = ServerInterceptors.intercept(new SomeService(), new ServerInterceptor() {
                  @Override
                  public  ServerCall.Listener interceptCall(ServerCall call, Metadata headersFromClient, ServerCallHandler next) {
                    // handle new call (ie.e set headers as in the above example.)
                    return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) {
                        @Override
                        public void close(final Status status, final Metadata trailers) {
                            trailers.put(ApiUtil.META_EXECUTION_COST, CpuTicker.getElapsedTime());
                            super.close(status, trailers);
                        }
                    }, headers);
                  }
              });
            
          

Deadline

            
                stub = stub.withDeadline(Deadline.after(5, TimeUnit.SECONDS));
            
          
            
                …
                private static final ScheduledExecutorService CANCELLATION_SERVICE = Executors.newSingleThreadScheduledExecutor();

                public void someServerStreamingMethod(final SomeRequestObject request, final StreamObserver responseObserver) {
                  Context context = context.withDeadline(context.getDeadline(), CANCELLATION_SERVICE);
                  Context previous = context.attach();
                  try {
                    while (!context.isCanecelled()) {
                        // do work
                  } finally {
                    context.detach(previous);
                  }
                }
            
          

Load Balancing

The specification document describe a few scenarios and the architecture. A LoadBalancerFactory can be attached to a channel, to specify the load balancing strategy, this way:

              
                  channel.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance());
              
            

Retries

The specification document for retries is quite comprehensive.

Distributed Tracing

There are some rumours of integrated tracing support in gRPC, but as of now the only out of the box solution I've found is a library that provides interceptors for ZipKin. One could also implement this from scratch following the Ambient Context Pattern by using context for cross-thread tracing and, metadata and interceptors for cross-process tracing.