gRPC流式服务入门示例

gRPC流式服务入门示例

🗨

在protobuf的proto文件中除了可以定义message格式,还有一种类型时service。Google想通过service来实现rpc的功能,但是并没有在protobuf中实现,而是开放给社区这个接口可以自己实现。同时Google开源了一个官方的实现grpc来生成对应的rpc调用

proto定义

首先在proto文件中定义想要的service

syntax = "proto3";

option java_package = "blog.proto";

message Person{
    string my_name=1;
}

message Result{
    string string=1;
}

service HelloService {
     rpc hello(Person) returns (Result) {}
 }

官方推荐在grpc中使用proto3,上面可以看到定义了一个HelloService,其下定义了hello方法,Person是入参,Result是出参。需要注意的是入参和出参无法使用简单的数据类型不然会报 Expected message type.

编译

proto文件是需要经过protoc来生成对应的开发语言的源码的,在grpc中需要结合使用grpc的插件来实现proto文件中的service生成java服务端/客户端文件。这里沿用之前的gradle插件

protobuf {
    generatedFilesBaseDir = "$projectDir/src/"
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.2.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
        }
    }
}

在protobuf的配置中加入grpc的插件并,运行generateProto之后就可以在src/main下看到一个新的grpc目录,这个目录中就是生成的service接口,生成的文件在客户端和服务端都需要。注意,只有service的接口/类会生成在这个目录,其他的message定义还是保持生成在原来的目录。由于grpc目录不是默认的sourceset,所以编译无法找到对应的生成的java文件,不想每次编译都手动增加目录到编译路径,可以在gradle的build文件中将grpc默认加到sourceset中

sourceSets {
    main {
        java.srcDir 'src/main/grpc'
    }
}

Server端

在Server端需要我们手动重写service的实现并实现Server来启动服务

//服务端的实现继承生成的ImplBase类
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
    @Override
    public void hello(blog.proto.ProtoObj.Person request,
                      io.grpc.stub.StreamObserver<blog.proto.ProtoObj.Result> responseObserver) {
        System.out.println(request.getMyName()+" calling");
        //onNext返回值
        responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
        //服务结束
        responseObserver.onCompleted();
    }
}

//这是一个简单的Server实现
public class HelloServer {
    private int port;
    private Server server;
    public HelloServer(int port) throws IOException {
        this.port=port;
        //server的builder
        server=ServerBuilder.forPort(port).addService(new HelloServiceImpl()).build();
        //开始服务器
        server.start();
        System.out.println("Server started, listening on " + port );
    }

    private void blockUntilShutdown() throws InterruptedException {
        while(true){
            server.awaitTermination();
        }
    }
    public static void main(String[] args) throws Exception {
        //启动8080端口并block线程
        (new HelloServer(8080)).blockUntilShutdown();
    }
}

之后运行main方法,服务就启动了。

Client端

Client端在生成完java接口后可以构建Stub与服务器通讯

public class HelloClient {
    public static void  main(String[] args){
        //grpc的channel
        ManagedChannel channel=ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
        //构建服务的stub
        HelloServiceGrpc.HelloServiceBlockingStub stub= HelloServiceGrpc.newBlockingStub(channel);
        ProtoObj.Person person=ProtoObj.Person.newBuilder().setMyName("World").build();
        //调用方法
        System.out.println(stub.hello(person).getString());
        //关闭channel,不然服务端会报错“远程主机强迫关闭了一个现有的连接。”
        channel.shutdown();
    }
}

之后运行main方法就可以看到输出hello, World

 

上边简单介绍了grpc的使用方法,并创建了一个方法调用,在grpc中有四种服务类型,下面分别进行介绍

简单rpc

这就是一般的rpc调用,一个请求对象对应一个返回对象

proto语法:
rpc simpleHello(Person) returns (Result) {}

service代码
@Override
public void simpleHello(ProtoObj.Person request,
                  io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
    //返回结果
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
    responseObserver.onCompleted();
}

client代码
@Test
public void  simple() throws InterruptedException {

    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    //定义同步阻塞的stub
    HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);

    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
    //simple
    System.out.println("---simple rpc---");
    System.out.println(blockingStub.simpleHello(person).getString());
    channel.shutdown();
}

输出
---simple rpc---
hello, World

服务端流式rpc

一个请求对象,服务端可以传回多个结果对象

proto语法
rpc serverStreamHello(Person) returns (stream Result) {}

service代码
@Override
public void serverStreamHello(ProtoObj.Person request,
                        io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
    //返回多个结果
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello2, "+request.getMyName()).build());
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+request.getMyName()).build());
    responseObserver.onCompleted();
}

client代码
@Test
public void serverStream(){

    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    //定义同步阻塞的stub
    HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);

    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();

    //server side
    System.out.println("---server stream rpc---");
    //返回结果是Iterator
    Iterator<ProtoObj.Result> it = blockingStub.serverStreamHello(person);
    while (it.hasNext()) {
        System.out.print(it.next());
    }
    channel.shutdown();
}

输出
---server stream rpc---
string: "hello, World"
string: "hello2, World"
string: "hello3, World"

客户端流式rpc

客户端传入多个请求对象,服务端返回一个响应结果

proto语法
rpc clientStreamHello(stream Person) returns (Result) {}

service代码
@Override
public io.grpc.stub.StreamObserver<ProtoObj.Person> clientStreamHello(
       final io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
   //返回observer应对多个请求对象
   return new StreamObserver<ProtoObj.Person>(){
       private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();
       @Override
       public void onNext(ProtoObj.Person value) {
            builder.setString(builder.getString() +"," + value.getMyName());
       }

       @Override
       public void onError(Throwable t) {

       }

       @Override
       public void onCompleted() {
           builder.setString("hello"+builder.getString());
           responseObserver.onNext(builder.build());
           responseObserver.onCompleted();
       }
   };
}

client代码
@Test
public void clientStream() throws InterruptedException {
    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    //定义异步的stub
    HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);
    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();

    //client side
    System.out.println("---client stream rpc---");
    StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() {
        @Override
        public void onNext(ProtoObj.Result result) {
            System.out.println("client stream--" + result.getString());
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onCompleted() {
            //关闭channel
            channel.shutdown();
        }
    };
    StreamObserver<ProtoObj.Person> clientStreamObserver = asyncStub.clientStreamHello(responseObserver);
    clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());
    clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());
    clientStreamObserver.onCompleted();
    //由于是异步获得结果,所以sleep一秒
    Thread.sleep(1000);
}

输出
---client stream rpc---
client stream--hello,World,World2

双向流式rpc

结合客户端流式rpc和服务端流式rpc,可以传入多个对象,返回多个响应对象

proto语法
rpc biStreamHello(stream Person) returns (stream Result) {}

service代码
@Override
public io.grpc.stub.StreamObserver<ProtoObj.Person> biStreamHello(
        final io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
    //返回observer应对多个请求对象
    return new StreamObserver<ProtoObj.Person>(){
        private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();
        @Override
        public void onNext(ProtoObj.Person value) {
            responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello2, "+value.getMyName()).build());
            responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+value.getMyName()).build());
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }
    };
}

client代码
@Test
public void bidirectStream() throws InterruptedException {

    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    //定义异步的stub
    HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);

    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();

    //bi stream
    System.out.println("---bidirectional stream rpc---");
    StreamObserver<ProtoObj.Result>  responseObserver = new StreamObserver<ProtoObj.Result>() {
        @Override
        public void onNext(ProtoObj.Result result) {
            System.out.println("bidirectional stream--"+result.getString());
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onCompleted() {
            channel.shutdown();
        }
    };
    StreamObserver<ProtoObj.Person> biStreamObserver=asyncStub.biStreamHello(responseObserver);
    biStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());
    biStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());
    biStreamObserver.onCompleted();
    //由于是异步获得结果,所以sleep一秒
    Thread.sleep(1000);

}

输出
---bidirectional stream rpc---
bidirectional stream--hello2, World
bidirectional stream--hello3, World
bidirectional stream--hello2, World2
bidirectional stream--hello3, World2    

总结

grpc通过使用流式的方式,返回/接受多个实例可以用于类似不定长数组的入参和出参


频道:Go