Kotlin gRPC 예제 - (5) gRPC Server/Client 예제 (Multi Request -> Single Response)

|

hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}


HelloServer.kt

Multi Request에 대한 Single Response를 위해 세 번째 메소드인 LotsOfGreetings에 대해서 구현을 해봅니다. 지금부터는 Client 쪽 코드에 BlockingStub이 아닌 AsyncStub을 사용하는 것을 주의합니다.

package com.snowdeer

import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val service = HelloService()
    val server = ServerBuilder
        .forPort(10004)
        .addService(service)
        .build()

    println("[snowdeer] server starts()")
    server.start()
    server.awaitTermination()
}

class HelloService : HelloServiceGrpc.HelloServiceImplBase() {

    override fun sayHello(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] sayHello(${request?.greeting})")

        val response = Hello.HelloResponse.newBuilder().setReply(request?.greeting).build()
        responseObserver?.onNext(response)
        responseObserver?.onCompleted()
    }

    override fun lotsOfReplies(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] lotsOfReplies()")

        for(i in 0 until 5) {
            val resp = Hello.HelloResponse.newBuilder()
                .setReply("hello - $i")
                .build()
            responseObserver?.onNext(resp)

            sleep(1000)
        }
        responseObserver?.onCompleted()
    }

    override fun lotsOfGreetings(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        return object : StreamObserver<Hello.HelloRequest> {

            override fun onNext(value: Hello.HelloRequest?) {
                println("[snowdeer] lotsOfGreetings() - onNext(${value?.greeting})")
            }

            override fun onError(t: Throwable?) {
                println("[snowdeer] lotsOfGreetings() - onError()")
            }

            override fun onCompleted() {
                println("[snowdeer] lotsOfGreetings() - onCompleted()")

                val response = Hello.HelloResponse.newBuilder().setReply("lotsOfGreetings is completed").build()
                responseObserver?.onNext(response)
                responseObserver?.onCompleted()
            }
        }
    }

    override fun bidiHello(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] bidiHello()")
        return super.bidiHello(responseObserver)
    }
}

위 코드에서 lotsOfGreetings 메소드가 리턴하는 것은 Hello.HelloRequest를 처리하는 StreamObserver 객체인 것을 알 수 있습니다.


HelloClient.kt

package com.snowdeer

import io.grpc.ManagedChannelBuilder

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val channel = ManagedChannelBuilder
        .forAddress("localhost", 10004)
        .usePlaintext()
        .build()
        
    val stub = HelloServiceGrpc.newBlockingStub(channel)
    val asyncStub = HelloServiceGrpc.newStub(channel)

    val requestObserver = asyncStub.lotsOfGreetings(ResponseStreamObserver())

    for(i in 0 until 5) {
        requestObserver.onNext(getHelloRequest("lotsOfGreetings - $i"))
        sleep(1000)
    }
    requestObserver.onCompleted()

    while(true) {
        sleep(2000)
        println("Thread is running.")
    }
}

fun getHelloRequest(greeting: String): Hello.HelloRequest {
    return Hello.HelloRequest.newBuilder()
        .setGreeting(greeting)
        .build()
}

class ResponseStreamObserver : StreamObserver<Hello.HelloResponse> {
    override fun onNext(value: Hello.HelloResponse?) {
        println("[snowdeer] onNext(${value?.reply})")
    }

    override fun onError(t: Throwable?) {
        println("[snowdeer] onError()")
    }

    override fun onCompleted() {
        println("[snowdeer] onCompleted()")
    }
}

Kotlin gRPC 예제 - (4) gRPC Server/Client 예제 (Single Request -> Multi Response)

|

hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}


HelloServer.kt

Single Request에 대한 Multi Response를 위해 두 번째 메소드인 lotsOfReplies에 대해서 구현을 해봅니다.

package com.snowdeer

import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val service = HelloService()
    val server = ServerBuilder
        .forPort(10004)
        .addService(service)
        .build()

    println("[snowdeer] server starts()")
    server.start()
    server.awaitTermination()
}

class HelloService : HelloServiceGrpc.HelloServiceImplBase() {

    override fun sayHello(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] sayHello(${request?.greeting})")

        val response = Hello.HelloResponse.newBuilder().setReply(request?.greeting).build()
        responseObserver?.onNext(response)
        responseObserver?.onCompleted()
    }

    override fun lotsOfReplies(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] lotsOfReplies()")

        for(i in 0 until 5) {
            val resp = Hello.HelloResponse.newBuilder()
                .setReply("hello - $i")
                .build()
            responseObserver?.onNext(resp)

            sleep(1000)
        }
        responseObserver?.onCompleted()
    }

    override fun lotsOfGreetings(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] lotsOfGreetings()")
        return super.lotsOfGreetings(responseObserver)
    }

    override fun bidiHello(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] bidiHello()")
        return super.bidiHello(responseObserver)
    }
}


HelloClient.kt

package com.snowdeer

import io.grpc.ManagedChannelBuilder

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val channel = ManagedChannelBuilder
        .forAddress("localhost", 10004)
        .usePlaintext()
        .build()
        
    val stub = HelloServiceGrpc.newBlockingStub(channel)
    val response = stub.lotsOfReplies(getHelloRequest("good morning"))

    response.forEach {
        println("[snowdeer] response: ${it.reply}")
    }

    println("[snowdeer] response.forEach is finished")
}

fun getHelloRequest(greeting: String): Hello.HelloRequest {
    return Hello.HelloRequest.newBuilder()
        .setGreeting(greeting)
        .build()
}

Kotlin gRPC 예제 - (3) gRPC Server/Client 예제 (Single Request -> Single Response)

|

hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}


HelloServer.kt

위에서 총 4개의 메소드를 정의했지만, 일단 첫 번째 메소드인 sayHello에 대해서만 구현을 해봅니다.

package com.snowdeer

import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val service = HelloService()
    val server = ServerBuilder
        .forPort(10004)
        .addService(service)
        .build()

    println("[snowdeer] server starts()")
    server.start()
    server.awaitTermination()
}

class HelloService : HelloServiceGrpc.HelloServiceImplBase() {

    override fun sayHello(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] sayHello(${request?.greeting})")

        val response = Hello.HelloResponse.newBuilder().setReply(request?.greeting).build()
        responseObserver?.onNext(response)
        responseObserver?.onCompleted()
    }

    override fun lotsOfReplies(request: Hello.HelloRequest?, responseObserver: StreamObserver<Hello.HelloResponse>?) {
        println("[snowdeer] lotsOfReplies()")
    }

    override fun lotsOfGreetings(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] lotsOfGreetings()")
        return super.lotsOfGreetings(responseObserver)
    }

    override fun bidiHello(responseObserver: StreamObserver<Hello.HelloResponse>?): StreamObserver<Hello.HelloRequest> {
        println("[snowdeer] bidiHello()")
        return super.bidiHello(responseObserver)
    }
}


HelloClient.kt

package com.snowdeer

import io.grpc.ManagedChannelBuilder

fun main(args: Array<String>) {

    println("[snowdeer] main()")
    val channel = ManagedChannelBuilder
        .forAddress("localhost", 10004)
        .usePlaintext()
        .build()
        
    val stub = HelloServiceGrpc.newBlockingStub(channel)
    val response = stub.sayHello(getHelloRequest("hello. snowdeer"))

    println("[snowdeer] response(${response.reply})")
}

fun getHelloRequest(greeting: String): Hello.HelloRequest {
    return Hello.HelloRequest.newBuilder()
        .setGreeting(greeting)
        .build()
}

Kotlin gRPC 예제 - (2) gRPC를 사용하기 위한 build.gradle 및 Protobuf 메시지 빌드

|

gRPC를 사용하기 위한 build.gradle

build.gradle

gRPC를 사용하기 위해서는 필요한 플러그인과 종속성을 추가해야 합니다.

group 'com.snowdeer'
group 'com.snowdeer'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'kotlin'
apply plugin: 'application'
apply plugin: 'com.google.protobuf'
apply plugin: 'idea'

mainClassName = "com.snowdeer.MainKt"

repositories {
    mavenCentral()
}

buildscript {
    ext.kotlin_version = '1.3.61'
    ext.grpc_version = '1.17.0'

    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8'
    }
}

dependencies {
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"

    compile "com.google.api.grpc:proto-google-common-protos:0.1.9"
    compile "io.grpc:grpc-netty:${grpc_version}"
    compile "io.grpc:grpc-protobuf:${grpc_version}"
    compile "io.grpc:grpc-stub:${grpc_version}"

    compile("javax.annotation:javax.annotation-api:1.3.2")

    testCompile group: 'junit', name: 'junit', version:'4.12'
}

idea {
    module {
        sourceDirs += file("${projectDir}/build/generated/source/proto/main/java");
        sourceDirs += file("${projectDir}/build/generated/source/proto/main/grpc");
    }
}

compileKotlin.dependsOn ':generateProto'
sourceSets.main.java.srcDirs += 'build/generated/source/proto/main/grpc'
sourceSets.main.java.srcDirs += 'build/generated/source/proto/main/java'

compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"
}


protobuf {
    protobuf {
        protoc { artifact = "com.google.protobuf:protoc:3.6.1" }
        plugins {
            grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpc_version}" }
        }
        generateProtoTasks {
            all()*.plugins { grpc {} }
        }
    }
}


main/proto/hello.proto

syntax = "proto3";

package com.snowdeer;
option java_outer_classname = "Hello";

service HelloService {
    rpc SayHello (HelloRequest) returns (HelloResponse);
    rpc LotsOfReplies (HelloRequest) returns (stream HelloResponse);
    rpc LotsOfGreetings (stream HelloRequest) returns (HelloResponse);
    rpc BidiHello (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
    string greeting = 1;
}

message HelloResponse {
    string reply = 1;
}

HelloService에는 4개의 메소드가 정의되어 있습니다. 각각은 다음 특징을 가집니다.

  • SayHello: 일반적인 Message 형태로 단일 요청에 단일 응답
  • LotsOfReplies: 단일 요청에 대해 Stream 형태의 응답 전달
  • LotsOfGreetings: 클라이언트에서 서버로 Stream 형태의 요청을 보내며, 서버에서는 단일 응답 전달함
  • BidiHello: 양방향으로 Stream 형태의 요청과 응답을 전달함

위 파일을 추가한다음 Gradle 빌드 과정을 거치면 build/generated/source/proto 디렉토리 아래에 메시지 관련 클래스들이 생성됩니다.

Kotlin gRPC 예제 - (1) gRPC 소개

|

gRPC

gRPC는 구글이 공개한 RPC(Remote Procedure Call) 오픈 소스이며, CNCF 차원에서 밀고 있습니다. 여기에서 더 많은 정보를 볼 수 있습니다.


HTTP/2의 특징

HTTP/2 기반으로 되어 있으며 HTTP/1에 비해 다음과 같은 장점을 가집니다.

  • HTTP Connection 재사용: 기존 HTTP에서는 매 요청마다 Connection을 새로 가져가지만, gRPC에서는 Channel이라는 형태로 기존 Connection을 유지해서 가져갑니다. 덕분에 매번 Connection하는 Cost가 대폭 줄었습니다.
  • 멀티플렉싱: gRPC는 하나의 Connection에서 여러 요청을 보낼 수 있습니다. 또한 전송하는 데이터의 우선 순위를 정할 수도 있습니다.
  • 메시지 압축: HTTP/2의 헤더 압축 기능을 사용합니다.
  • 서버에서 Push 가능: 한 번 Connection이 맺어진 다음부터는 양방향 통신이 되기 때문에 Push 기능을 자연스럽게 사용할 수 있습니다.


gRPC와 Protobuf

gRPC는 메시지를 전송하는 IDL(Interface Definition Language)을 Protobuf라는 라이브러리를 사용하고 있습니다.

message Person {
  string name = 1;
  int32 id = 2;
  bool has_ponycopter = 3;
}

Protobuf는 위와 같은 형식으로 되어 있으며, .proto 확장자를 가집니다. protoc라는 컴파일러를 이용해서 메시지를 컴파일 할 수 있으며, 컴파일된 결과물로 서버측과 클라이언트측에서 사용할 수 있는 코드가 생성됩니다.

위 메시지 예제에서 name 이나 id와 같은 필드는 각 언어별로 적절한 Setter/Getter 함수를 자동으로 생성해서 제공해줍니다.


service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

위 코드를 보면 servicemessage가 존재합니다. service는 서버와 클라이언트 양측에서 사용할 함수들의 묶음이라 생각할 수 있으며, message는 실제로 주고받는 데이터입니다.