Kotlin gRPC 예제 - (5) gRPC Server/Client 예제 (Multi Request -> Single Response)
27 Dec 2019 | Kotlin gRPChello.proto
syntax = "proto3"; package com.snowdeer; option java_outer_classname = "Hello"; service HelloService { rpc SayHello (HelloRequest) returns (HelloResponse); rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse); rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse); rpc BidiHello (stream HelloRequest) returns (stream HelloResponse); } message HelloRequest { string greeting = 1; } message HelloResponse { string reply = 1; }
HelloServer.kt
Multi Request에 대한 Single Response를 위해 세 번째 메소드인 LotsOfGreetings
에 대해서 구현을 해봅니다.
지금부터는 Client 쪽 코드에 BlockingStub
이 아닌 AsyncStub
을 사용하는 것을 주의합니다.
package com.snowdeer import io.grpc.ServerBuilder import io.grpc.stub.StreamObserver fun main(args: Array<String>) { println("[snowdeer] main()") val service = HelloService() val server = ServerBuilder .forPort(10004) .addService(service) .build() println("[snowdeer] server starts()") server.start() server.awaitTermination() } class HelloService : HelloServiceGrpc.HelloServiceImplBase() { override fun sayHello(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) { println("[snowdeer] sayHello(${request?.greeting})") val response = Hello.HelloResponse.newBuilder().setReply(request?.greeting).build() responseObserver?.onNext(response) responseObserver?.onCompleted() } override fun lotsOfReplies(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) { println("[snowdeer] lotsOfReplies()") for(i in 0 until 5) { val resp = Hello.HelloResponse.newBuilder() .setReply("hello - $i") .build() responseObserver?.onNext(resp) sleep(1000) } responseObserver?.onCompleted() } override fun lotsOfGreetings(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> { return object : StreamObserver<Hello.HelloRequest> { override fun onNext(value: Hello.HelloRequest?) { println("[snowdeer] lotsOfGreetings() - onNext(${value?.greeting})") } override fun onError(t: Throwable?) { println("[snowdeer] lotsOfGreetings() - onError()") } override fun onCompleted() { println("[snowdeer] lotsOfGreetings() - onCompleted()") val response = Hello.HelloResponse.newBuilder().setReply("lotsOfGreetings is completed").build() responseObserver?.onNext(response) responseObserver?.onCompleted() } } } override fun bidiHello(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> { println("[snowdeer] bidiHello()") return super.bidiHello(responseObserver) } }
위 코드에서 lotsOfGreetings
메소드가 리턴하는 것은 Hello.HelloRequest
를 처리하는 StreamObserver
객체인 것을 알 수 있습니다.
HelloClient.kt
package com.snowdeer import io.grpc.ManagedChannelBuilder fun main(args: Array<String>) { println("[snowdeer] main()") val channel = ManagedChannelBuilder .forAddress("localhost", 10004) .usePlaintext() .build() val stub = HelloServiceGrpc.newBlockingStub(channel) val asyncStub = HelloServiceGrpc.newStub(channel) val requestObserver = asyncStub.lotsOfGreetings(ResponseStreamObserver()) for(i in 0 until 5) { requestObserver.onNext(getHelloRequest("lotsOfGreetings - $i")) sleep(1000) } requestObserver.onCompleted() while(true) { sleep(2000) println("Thread is running.") } } fun getHelloRequest(greeting: String): Hello.HelloRequest { return Hello.HelloRequest.newBuilder() .setGreeting(greeting) .build() } class ResponseStreamObserver : StreamObserver<Hello.HelloResponse> { override fun onNext(value: Hello.HelloResponse?) { println("[snowdeer] onNext(${value?.reply})") } override fun onError(t: Throwable?) { println("[snowdeer] onError()") } override fun onCompleted() { println("[snowdeer] onCompleted()") } }