Kotlin gRPC 예제 - (6) gRPC Server/Client 예제 (Multi Request -> Multi Response)

|

hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}


HelloServer.kt

Multi Request에 대한 Single Response를 위해 네 번째 메소드인 BidiHello에 대해서 구현을 해봅니다. 지금부터는 Client 쪽 코드에 BlockingStub이 아닌 AsyncStub을 사용하는 것을 주의합니다.

package com.snowdeer

import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val service = HelloService()
    val server = ServerBuilder
        .forPort(10004)
        .addService(service)
        .build()

    println("[snowdeer] server starts()")
    server.start()
    server.awaitTermination()
}

class HelloService : HelloServiceGrpc.HelloServiceImplBase() {

    override fun sayHello(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] sayHello(${request?.greeting})")

        val response = Hello.HelloResponse.newBuilder().setReply(request?.greeting).build()
        responseObserver?.onNext(response)
        responseObserver?.onCompleted()
    }

    override fun lotsOfReplies(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] lotsOfReplies()")

        for(i in 0 until 5) {
            val resp = Hello.HelloResponse.newBuilder()
                .setReply("hello - $i")
                .build()
            responseObserver?.onNext(resp)

            sleep(1000)
        }
        responseObserver?.onCompleted()
    }

    override fun lotsOfGreetings(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        return object : StreamObserver<Hello.HelloRequest> {

            override fun onNext(value: Hello.HelloRequest?) {
                println("[snowdeer] lotsOfGreetings() - onNext(${value?.greeting})")
            }

            override fun onError(t: Throwable?) {
                println("[snowdeer] lotsOfGreetings() - onError()")
            }

            override fun onCompleted() {
                println("[snowdeer] lotsOfGreetings() - onCompleted()")

                val response = Hello.HelloResponse.newBuilder().setReply("lotsOfGreetings is completed").build()
                responseObserver?.onNext(response)
                responseObserver?.onCompleted()
            }
        }
    }

    override fun bidiHello(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        return object : StreamObserver<Hello.HelloRequest> {

            override fun onNext(value: Hello.HelloRequest?) {
                println("[snowdeer] bidiHello() - onNext(${value?.greeting})")

                val resp = Hello.HelloResponse.newBuilder()
                    .setReply("Response to Client - (${value?.greeting})")
                    .build()
                responseObserver?.onNext(resp)
            }

            override fun onError(t: Throwable?) {
                println("[snowdeer] bidiHello() - onError()")
            }

            override fun onCompleted() {
                println("[snowdeer] bidiHello() - onCompleted()")
                responseObserver?.onCompleted()
            }
        }
    }
}

위 코드에서 lotsOfGreetings 메소드가 리턴하는 것은 Hello.HelloRequest를 처리하는 StreamObserver 객체인 것을 알 수 있습니다.


HelloClient.kt

package com.snowdeer

import io.grpc.ManagedChannelBuilder

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val channel = ManagedChannelBuilder
        .forAddress("localhost", 10004)
        .usePlaintext()
        .build()

    val stub = HelloServiceGrpc.newBlockingStub(channel)
    val asyncStub = HelloServiceGrpc.newStub(channel)

    val requestObserver = asyncStub.bidiHello(ResponseStreamObserver())

    for(i in 0 until 10) {
        requestObserver.onNext(getHelloRequest("bidiHello to Server - $i"))
        sleep(800)
    }
    requestObserver.onCompleted()

    while(true) {
        sleep(1000)
        println("Thread is running.")
    }
}

fun getHelloRequest(greeting: String): Hello.HelloRequest {
    return Hello.HelloRequest.newBuilder()
        .setGreeting(greeting)
        .build()
}

class ResponseStreamObserver : StreamObserver<Hello.HelloResponse> {
    override fun onNext(value: Hello.HelloResponse?) {
        println("[snowdeer] onNext(${value?.reply})")
    }

    override fun onError(t: Throwable?) {
        println("[snowdeer] onError()")
    }

    override fun onCompleted() {
        println("[snowdeer] onCompleted()")
    }
}

Kotlin gRPC 예제 - (5) gRPC Server/Client 예제 (Multi Request -> Single Response)

|

hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}


HelloServer.kt

Multi Request에 대한 Single Response를 위해 세 번째 메소드인 LotsOfGreetings에 대해서 구현을 해봅니다. 지금부터는 Client 쪽 코드에 BlockingStub이 아닌 AsyncStub을 사용하는 것을 주의합니다.

package com.snowdeer

import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val service = HelloService()
    val server = ServerBuilder
        .forPort(10004)
        .addService(service)
        .build()

    println("[snowdeer] server starts()")
    server.start()
    server.awaitTermination()
}

class HelloService : HelloServiceGrpc.HelloServiceImplBase() {

    override fun sayHello(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] sayHello(${request?.greeting})")

        val response = Hello.HelloResponse.newBuilder().setReply(request?.greeting).build()
        responseObserver?.onNext(response)
        responseObserver?.onCompleted()
    }

    override fun lotsOfReplies(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] lotsOfReplies()")

        for(i in 0 until 5) {
            val resp = Hello.HelloResponse.newBuilder()
                .setReply("hello - $i")
                .build()
            responseObserver?.onNext(resp)

            sleep(1000)
        }
        responseObserver?.onCompleted()
    }

    override fun lotsOfGreetings(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        return object : StreamObserver<Hello.HelloRequest> {

            override fun onNext(value: Hello.HelloRequest?) {
                println("[snowdeer] lotsOfGreetings() - onNext(${value?.greeting})")
            }

            override fun onError(t: Throwable?) {
                println("[snowdeer] lotsOfGreetings() - onError()")
            }

            override fun onCompleted() {
                println("[snowdeer] lotsOfGreetings() - onCompleted()")

                val response = Hello.HelloResponse.newBuilder().setReply("lotsOfGreetings is completed").build()
                responseObserver?.onNext(response)
                responseObserver?.onCompleted()
            }
        }
    }

    override fun bidiHello(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] bidiHello()")
        return super.bidiHello(responseObserver)
    }
}

위 코드에서 lotsOfGreetings 메소드가 리턴하는 것은 Hello.HelloRequest를 처리하는 StreamObserver 객체인 것을 알 수 있습니다.


HelloClient.kt

package com.snowdeer

import io.grpc.ManagedChannelBuilder

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val channel = ManagedChannelBuilder
        .forAddress("localhost", 10004)
        .usePlaintext()
        .build()
        
    val stub = HelloServiceGrpc.newBlockingStub(channel)
    val asyncStub = HelloServiceGrpc.newStub(channel)

    val requestObserver = asyncStub.lotsOfGreetings(ResponseStreamObserver())

    for(i in 0 until 5) {
        requestObserver.onNext(getHelloRequest("lotsOfGreetings - $i"))
        sleep(1000)
    }
    requestObserver.onCompleted()

    while(true) {
        sleep(2000)
        println("Thread is running.")
    }
}

fun getHelloRequest(greeting: String): Hello.HelloRequest {
    return Hello.HelloRequest.newBuilder()
        .setGreeting(greeting)
        .build()
}

class ResponseStreamObserver : StreamObserver<Hello.HelloResponse> {
    override fun onNext(value: Hello.HelloResponse?) {
        println("[snowdeer] onNext(${value?.reply})")
    }

    override fun onError(t: Throwable?) {
        println("[snowdeer] onError()")
    }

    override fun onCompleted() {
        println("[snowdeer] onCompleted()")
    }
}

Kotlin gRPC 예제 - (4) gRPC Server/Client 예제 (Single Request -> Multi Response)

|

hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}


HelloServer.kt

Single Request에 대한 Multi Response를 위해 두 번째 메소드인 lotsOfReplies에 대해서 구현을 해봅니다.

package com.snowdeer

import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val service = HelloService()
    val server = ServerBuilder
        .forPort(10004)
        .addService(service)
        .build()

    println("[snowdeer] server starts()")
    server.start()
    server.awaitTermination()
}

class HelloService : HelloServiceGrpc.HelloServiceImplBase() {

    override fun sayHello(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] sayHello(${request?.greeting})")

        val response = Hello.HelloResponse.newBuilder().setReply(request?.greeting).build()
        responseObserver?.onNext(response)
        responseObserver?.onCompleted()
    }

    override fun lotsOfReplies(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] lotsOfReplies()")

        for(i in 0 until 5) {
            val resp = Hello.HelloResponse.newBuilder()
                .setReply("hello - $i")
                .build()
            responseObserver?.onNext(resp)

            sleep(1000)
        }
        responseObserver?.onCompleted()
    }

    override fun lotsOfGreetings(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] lotsOfGreetings()")
        return super.lotsOfGreetings(responseObserver)
    }

    override fun bidiHello(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] bidiHello()")
        return super.bidiHello(responseObserver)
    }
}


HelloClient.kt

package com.snowdeer

import io.grpc.ManagedChannelBuilder

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val channel = ManagedChannelBuilder
        .forAddress("localhost", 10004)
        .usePlaintext()
        .build()
        
    val stub = HelloServiceGrpc.newBlockingStub(channel)
    val response = stub.lotsOfReplies(getHelloRequest("good morning"))

    response.forEach {
        println("[snowdeer] response: ${it.reply}")
    }

    println("[snowdeer] response.forEach is finished")
}

fun getHelloRequest(greeting: String): Hello.HelloRequest {
    return Hello.HelloRequest.newBuilder()
        .setGreeting(greeting)
        .build()
}

Kotlin gRPC 예제 - (3) gRPC Server/Client 예제 (Single Request -> Single Response)

|

hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}


HelloServer.kt

위에서 총 4개의 메소드를 정의했지만, 일단 첫 번째 메소드인 sayHello에 대해서만 구현을 해봅니다.

package com.snowdeer

import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val service = HelloService()
    val server = ServerBuilder
        .forPort(10004)
        .addService(service)
        .build()

    println("[snowdeer] server starts()")
    server.start()
    server.awaitTermination()
}

class HelloService : HelloServiceGrpc.HelloServiceImplBase() {

    override fun sayHello(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] sayHello(${request?.greeting})")

        val response = Hello.HelloResponse.newBuilder().setReply(request?.greeting).build()
        responseObserver?.onNext(response)
        responseObserver?.onCompleted()
    }

    override fun lotsOfReplies(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] lotsOfReplies()")
    }

    override fun lotsOfGreetings(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] lotsOfGreetings()")
        return super.lotsOfGreetings(responseObserver)
    }

    override fun bidiHello(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] bidiHello()")
        return super.bidiHello(responseObserver)
    }
}


HelloClient.kt

package com.snowdeer

import io.grpc.ManagedChannelBuilder

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val channel = ManagedChannelBuilder
        .forAddress("localhost", 10004)
        .usePlaintext()
        .build()
        
    val stub = HelloServiceGrpc.newBlockingStub(channel)
    val response = stub.sayHello(getHelloRequest("hello. snowdeer"))

    println("[snowdeer] response(${response.reply})")
}

fun getHelloRequest(greeting: String): Hello.HelloRequest {
    return Hello.HelloRequest.newBuilder()
        .setGreeting(greeting)
        .build()
}

Kotlin gRPC 예제 - (2) gRPC를 사용하기 위한 build.gradle 및 Protobuf 메시지 빌드

|

gRPC를 사용하기 위한 build.gradle

build.gradle

gRPC를 사용하기 위해서는 필요한 플러그인과 종속성을 추가해야 합니다.

group 'com.snowdeer'
group 'com.snowdeer'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'kotlin'
apply plugin: 'application'
apply plugin: 'com.google.protobuf'
apply plugin: 'idea'

mainClassName = "com.snowdeer.MainKt"

repositories {
    mavenCentral()
}

buildscript {
    ext.kotlin_version = '1.3.61'
    ext.grpc_version = '1.17.0'

    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8'
    }
}

dependencies {
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"

    compile "com.google.api.grpc:proto-google-common-protos:0.1.9"
    compile "io.grpc:grpc-netty:${grpc_version}"
    compile "io.grpc:grpc-protobuf:${grpc_version}"
    compile "io.grpc:grpc-stub:${grpc_version}"

    compile("javax.annotation:javax.annotation-api:1.3.2")

    testCompile group: 'junit', name: 'junit', version:'4.12'
}

idea {
    module {
        sourceDirs += file("${projectDir}/build/generated/source/proto/main/java");
        sourceDirs += file("${projectDir}/build/generated/source/proto/main/grpc");
    }
}

compileKotlin.dependsOn ':generateProto'
sourceSets.main.java.srcDirs += 'build/generated/source/proto/main/grpc'
sourceSets.main.java.srcDirs += 'build/generated/source/proto/main/java'

compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"
}


protobuf {
    protobuf {
        protoc { artifact = "com.google.protobuf:protoc:3.6.1" }
        plugins {
            grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpc_version}" }
        }
        generateProtoTasks {
            all()*.plugins { grpc {} }
        }
    }
}


main/proto/hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}

HelloService에는 4개의 메소드가 정의되어 있습니다. 각각은 다음 특징을 가집니다.

  • SayHello: 일반적인 Message 형태로 단일 요청에 단일 응답
  • LotsOfReplies: 단일 요청에 대해 Stream 형태의 응답 전달
  • LotsOfGreetings: 클라이언트에서 서버로 Stream 형태의 요청을 보내며, 서버에서는 단일 응답 전달함
  • BidiHello: 양방향으로 Stream 형태의 요청과 응답을 전달함

위 파일을 추가한다음 Gradle 빌드 과정을 거치면 build/generated/source/proto 디렉토리 아래에 메시지 관련 클래스들이 생성됩니다.