Programming

반응형
Datadog에서 Armeria 모니터링을 위한 Custom


오늘은 Datadog APM에서 Armeria gRPC server 모니터링을 위해 Datadog 수동 계측을 적용했던 개발기를 포스팅 합니다.

개발 과정까지는 Datadog에서 HTTP/2 기반의 armeria-grpc 통신을 자동 계측 하지 못한다는 사실은 전혀 몰랐습니다. AWS Cloud 환경이 구축되어 가며, 플랫폼 엔지니어를 통해 Webflux netty와는 다르게 자동 계측이 안된다는 안타까운 소식을 접하게 되었습니다. 한 가지 희망은 자동 계측은 불가하지만 수동 계측으로 Datadog 모니터링을 할 수 있다며.. Link를 전달받았는데..

내용을 보면 OpenTelemetry library로 armeria-grpc 통신을 추적하고 수집하여 Dagadog Agent를 통해 Datadog Backend로 가져와 데쉬 보드를 구성하는 것으로 이해했는데, 자세한 내용은 아래 링크에서 참고하시기 바랍니다.

Datadog API Docs
https://docs.datadoghq.com/ko/tracing/trace_collection/custom_instrumentation/java/

 

Java Custom Instrumentation with Datadog Library

Instrument your code with the Datadog Java APM tracer.

docs.datadoghq.com

https://docs.datadoghq.com/ko/tracing/trace_collection/opentracing/java/

 

Java OpenTracing Instrumentation

OpenTracing instrumentation for Java

docs.datadoghq.com

Datadog에서 수동 계측을 위한 Sample 가이드를 참고하여, Opentracing library를 사용하여 수동 계측을 구현하였습니다. 사실 Opentracing은 @Deprecate 된 library 란 것을 Production 환경에 적용한 후에 알게 되었네요.. Datadog... API 가이드를 잘 봤어야 하는데, OpenTelemetry로 통합됐다고 하네요. 

그리고, Datadog v1.22 version에서 armeria-grpc 자동 계측을 지원하기위해 dd-trace-java library PR 중이고, 최근 Release 되어, 플랫폼 엔지니어분과 개발기에 적용 후 테스트를 하였지만, 그럼에도 자동 계측이 안되네요.. 언젠가 되길 바라며..

https://github.com/DataDog/dd-trace-java/releases/tag/v1.22.0 

 

Release 1.22.0 · DataDog/dd-trace-java

Breaking Changes ⚠️ This release contains a change in the normalization of resource names with spaces. See #5968 for further notes on details and the feature flag to revert back to the old behavior...

github.com

https://github.com/DataDog/dd-trace-java/pull/5819

 

Add support for armeria-grpc by devinsba · Pull Request #5819 · DataDog/dd-trace-java

What Does This Do Adds support for armeria-grpc

github.com


Datadog 수동 계측 구현은 다음과 같이 진행하였습니다.


Package 구성

build.gradle.kts

dependencies {
    implementation("com.datadoghq:dd-trace-api:1.21.0")
    implementation("io.opentracing:opentracing-api:0.33.0")
    implementation("io.opentracing:opentracing-util:0.33.0")
}


수동 계측 구현

GlobalTracerProvider.kt

OpenTracing interface의 GlobalTracer()를 통해 Noop span이 생성되고 스레드 추적을 시작할 수 있습니다. extracParentSpan()는 현재 실행되는 Active span 정보를 수동으로 생성할 때 end-point를 추가하여 분산 추적을 위해 Root span 정보를 받아오는 메서드입니다.
Kotlin object class로 정의하여 Singleton pattern을 적용하여 GlobalTracer() 인스턴스가 중복 생성되지 않도록 구현하였습니다.

@Component
object GlobalTracerProvider {

    fun getTracer(): Tracer {
        return GlobalTracer.get()
    }

    fun determineSpanType(req: HttpRequest): String {
        return if (req.headers()?.contentType().toString().endsWith("grpc")) {
            "grpc.server"
        } else {
            "http.server"
        }
    }

    fun extractParentSpan(tracer: Tracer, ctx: RequestContext): SpanContext? {
        val headers: Map<String, String>? = ctx.request()?.headers()?.associate { it.key.toString() to it.value }
        return tracer.extract(Format.Builtin.HTTP_HEADERS, TextMapAdapter(headers))
    }
}

https://kotlinlang.org/docs/object-declarations.html#object-declarations-overview

 

Object expressions and declarations | Kotlin

 

kotlinlang.org


GlobalSpanBuilder.kt

@Component
object GlobalSpanBuilder {

    fun buildSpan(tracer: Tracer, spanType: String, parentSpan: SpanContext?, ctx: RequestContext): Span {
        return tracer
            .buildSpan(spanType)
            .asChildOf(parentSpan)  // Root span add
            .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_SERVER)
            .withTag(DDTags.SERVICE_NAME, "armeria-service")
            .withTag(DDTags.RESOURCE_NAME, ctx.method().name + ' ' + ctx.path())
            .withTag(DDTags.HTTP_QUERY, ctx.query())
            .withTag(Tags.HTTP_METHOD, ctx.method().name)
            .withTag(Tags.HTTP_URL, ctx.uri().toString())
            .start()  // 현재 스레드를 span으로 생성
    }
}

DatadogTracerService.kt
Armeria DecoratingHttpServiceFunction class를 상속받고 serve() 메서드를 override하여 해당 스레드의 Span을 생성하고 Root span 정보를 추가하여 분산 추적이 가능한 수동 계측 로직을 구현하였습니다.

@Component
class DatadogTracerService(
    private val globalSpanBuilder: GlobalSpanBuilder,
    private val globalTracerProvider: GlobalTracerProvider
): DecoratingHttpServiceFunction {
    constructor(): this(GlobalSpanBuilder, GlobalTracerProvider)

    override fun serve(delegate: HttpService, ctx: ServiceRequestContext, req: HttpRequest): HttpResponse {
        val tracer = globalTracerProvider.getTracer()
        val spanType = globalTracerProvider.determineSpanType(req)
        val parentSpan = globalTracerProvider.extractParentSpan(tracer, ctx)

        val span = globalSpanBuilder.buildSpan(tracer, spanType, parentSpan, ctx)

        try {
            tracer.activateSpan(span).use {
                return delegate.serve(ctx, req)
            }
        } catch (e: Exception) {
            span.setTag(Tags.ERROR, true)
            span.setTag(DDTags.ERROR_MSG, e.message)
            span.setTag(DDTags.ERROR_TYPE, e.javaClass.name)
            span.setTag(DDTags.ERROR_STACK, e.stackTraceToString())
            throw e
        } finally {
            span.finish()
        }
    }
}

다행히 Production 환경에서 수동 계측한 Span 정보와 Root span 정보가 맵핑되어 armeria-grpc 분산 추적을 할 수 있게 되었습니다. Request 성공은 현재 로직을 Armeria @Decorator pattern으로 서비스 레이어에 추가하여 분산 계측이 가능하였지만, gRPC Exception case는 Datadog dash board에서 추적 되지 않는 문제점을 발견하게 되었습니다...🥶🥶

@Service
@Decorator(DatadogTracerService::class)
class PurchaseGrpcService(private val purchaseComponent: PurchaseComponent): PurchaseServiceGrpcKt.PurchaseServiceCoroutineImplBase() {
}


gRPC Exception Handler Function 구현

DatadogTracerService에서 생성한 Tracer 객체를 ActiveSpan()으로 상태 체크 후 gRPC Exception 메시지를 add 하여 Datadog으로 수집되도록 로직을 구현하였습니다.
Armeria 1.26.0 이상 version에서는 GrpcExceptionHandlerFunction class를 상속받아 apply() 메서드를 override 하도록 업데이트 되었으니 참고하시기 바랍니다. (@Deprecated GrpcStatusFunction)

@Component
class GlobalGrpcExceptionHandler(
    private val globalTracerProvider: GlobalTracerProvider,
): GrpcExceptionHandlerFunction {
    constructor(): this(GlobalTracerProvider)

    override fun apply(ctx: RequestContext, throwable: Throwable, metadata: Metadata): Status {
        val activeSpan = globalTracerProvider.getTracer().activeSpan()

        if (activeSpan != null) {
            with(activeSpan) {
                setTag(Tags.ERROR, true)
                setTag(DDTags.ERROR_MSG, throwable.message)
                setTag(DDTags.ERROR_TYPE, throwable.javaClass.name)
                setTag(DDTags.ERROR_STACK, throwable.stackTraceToString())
            }
            activeSpan.finish()
        }
        return GrpcStatus.fromThrowable(throwable)
    }
}

gRPCService 적용
v1.26 

@Service
@Decorator(DatadogTracerService::class)
@GrpcExceptionHandler(GlobalGrpcExceptionHandler::class)
class PurchaseGrpcService(private val purchaseComponent: PurchaseComponent): PurchaseServiceGrpcKt.PurchaseServiceCoroutineImplBase() {
}

v.1.26 이하
ArmeriaConfiguration에서 addExceptionMapping() 으로 ExceptionHandler() 처리

private fun configureGrpcService(sb: ServerBuilder) {
    val grpcServiceBuilder = GrpcService.builder()
        .addExceptionMapping(Throwable::class.java, GlobalGrpcExceptionHandler())
        .apply {
            grpcServices.forEach { addService(it) }
        }
}


하루빨리.. Armeria에 대한 자동 계측이 되기를 바라는 마음으로.. 🙏🙏.. 개발기 마무리 합니다.



OpenTelemetry와 OpenTracing 비교
https://signoz.io/blog/opentelemetry-vs-opentracing/

 

OpenTelemetry vs. OpenTracing - Decoding the Future of Telemetry Data | SigNoz

If you’re thinking of choosing between OpenTelemetry and OpenTracing, go for OpenTelemetry. OpenTracing is now deprecated, and users of OpenTracing are advised to migrate to OpenTelemetry...

signoz.io

https://uptrace.dev/blog/opentelemetry-vs-opentracing.html

 

OpenTelemetry vs OpenTracing: How to choose?

OpenTelemetry and OpenTracing are two related open source projects that aim to provide observability in modern distributed systems.

uptrace.dev


OpenTelemetry API
https://opentelemetry.io/docs/instrumentation/java/

 

Java

<img width="35" class="img-initial" src="/img/logos/32x32/Java_SDK.svg" alt="Java"> A language-specific implementation of OpenTelemetry in Java.

opentelemetry.io

반응형
반응형
Kotlin + Armeria + gRPC Server (with. Spring Boot) 개발 환경 구성

 

들어가며

오늘은 Armeria framework를 활용하여 gRPC Server 환경 구성을 포스트 하겠습니다.
Java를 주로 사용하다 최근 프로젝트에서 Kotlin기반의 Microservice 프로젝트에서 사용했던 기술 스택을 정리해 봅니다.
먼저 위 기술 조합은 네카라쿠배를 비롯한 기술 기반의 회사에서 많이들 사용하고 있습니다. 아마도 Monolithic환경에서 Microservice 전환 및 대용량 트래픽 처리를 위한 고성능 프레임워크를 필요로 할 경우 검토한다면, 기존 서버 스펙보다 더 낮은 사양과 더 적은 수의 스레드로 동시성 처리를 위한 High performance에 적합한 조합이라고 생각이 됩니다. 

Armeria는 LINE에서 개발한 고성능 비동기 통신이 가능한 프레임워크로 Spring의 Webflux와 비교되기도 하고 Armeria + Webflux 조합으로도 많이 도입하고 있는 추세입니다.
저의 경우도 프로젝트 진행 과정에서 요구사항의 변경으로 인해 gRPC serving과 REST serving을 모두 처리해야 했던 상황이라 고민 끝에 Armeria를 도입하게 되었습니다.
웃픈 일로 Armeria 도입을 고민하던 중 Slack 채널에서 trustin님과 DM을 통해 조금 더 Armeria 도입에 확신이 들었고, Spring Webflux + grpc-gateway 조합으로 REST 서비스 제공하려던 마음을 접고 Armeria로 입문하게 되었습니다.

Armeria의 자세한 장점 및 설명은 아래 링크에서 Deep 하게 확인해 보시기 바랍니다.

LINE 
https://engineering.linecorp.com/ko/blog/introduce-armeria

 

Armeria를 소개합니다

LINE DEV Meetup #11 'LINE 서버 개발자들이 말한다! Armeria 아직도 안 써요?'에서 이희승 님이 발표하신 'Three Principles of a Good Framework' 세션 내용을 옮긴 글입니다. 안녕하세요. LINE에서 Arme...

engineering.linecorp.com

DEVIEW with Armeria
https://deview.kr/2019/schedule/283

 

Armeria: 어디서나 잘 어울리는 마이크로서비스 프레임워크

발표자 : 이희승

deview.kr

Armeria API
https://armeria.dev/

 

Armeria – Your go-to microservice framework

Armeria is your go-to microservice framework for any situation. You can build any type of microservice leveraging your favorite technologies, including gRPC, Thrift, Kotlin, Retrofit, Reactive Streams, Spring Boot and Dropwizard. Brought to you by the crea

armeria.dev

Armeria Chat
https://armeria.dev/s/discord

 

Redirecting

 

armeria.dev

이제 본론으로 들어와 아래 순서로 포스트를 시작하겠습니다.

개발 환경 구성 순서

  1. 프로젝트 생성
  2. build.gradle.kts 구성
  3. Armeria 서버 구성
  4. Protobuf 파일 생성
  5. 서비스 레이어 구성
  6. 비즈니스 로직 작성
  7. gRPC server reflection 테스트

 

1. 프로젝트 생성

Spring Initializr를 선택하여 Spring Boot와도 쉽게 구성되는 조합으로 진행하시는 것을 추천합니다.

 

2. 빌드 파일 설정(build.gradle.kts)

spec

  • Spring Boot 3.2.0
  • Kotlin 1.9.20
  • gRPC kotlin 1.4.0
  • Armeria 1.26.3
  • Protobuf 3.24.0

plugins

plugins {
    // spring boot default
    id("org.springframework.boot") version "3.2.0"
    id("io.spring.dependency-management") version "1.1.4"
	
    // protobuf plugins dependencySpec 지정
    id("com.google.protobuf") version "0.9.2"
    
    // kotlin default
    kotlin("jvm") version "1.9.20"
    kotlin("plugin.spring") version "1.9.20"
    kotlin("kapt") version "1.9.20"
	
    // kotlin 직렬화를 위해 추가
    kotlin("plugin.serialization") version "1.9.20"
}

dependencies

object Version {
    const val grpc = "1.58.0"
    const val grpcKotlin = "1.4.0"
    const val protoc = "3.24.0"
    const val kotest = "5.5.5"
    const val armeria = "1.26.3"
}

dependencies {
    // Spring boot
    implementation("org.springframework.boot:spring-boot-starter")
    
    // reactive mongodb 연동을 위한
    implementation("org.springframework.boot:spring-boot-starter-data-mongodb-reactive")

    // Kotlin
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    
    // Kotlin 직렬화를 위한
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")

    // Armeria
    implementation(platform("com.linecorp.armeria:armeria-bom:${Version.armeria}"))
    implementation("com.linecorp.armeria:armeria-grpc")
    implementation("com.linecorp.armeria:armeria-kotlin")
    implementation("com.linecorp.armeria:armeria-spring-boot3-autoconfigure")

    // gRPC stub
    implementation("io.grpc:grpc-kotlin-stub:${Version.grpcKotlin}")
    
    // protobuf
    implementation("com.google.protobuf:protobuf-kotlin:${Version.protoc}")


    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

ProtobufPlugin

import com.google.protobuf.gradle.*

plugins.withType<ProtobufPlugin> {
	
    // protobuf file 소스 경로 지정
    sourceSets {
        main {
            proto {
                srcDir("/proto/order")
            }
        }
    }
	
    // protobuf compile, code generate 를 위한 설정
    protobuf {
        protoc {
            artifact = "com.google.protobuf:protoc:${Version.protoc}"
        }

        plugins {
            id("grpc") {
                artifact = "io.grpc:protoc-gen-grpc-java:${Version.grpc}"
            }
            id("grpckt") {
                artifact = "io.grpc:protoc-gen-grpc-kotlin:${Version.grpcKotlin}:jdk8@jar"
            }
        }

        generateProtoTasks {
            ofSourceSet("main").forEach{
                it.plugins {
                    id("grpc")
                    id("grpckt")
                }
                it.builtins {
                    id("kotlin")
                }
            }
        }
    }
}

 

3. Armeria 서버 구성

ArmeriaConfiguration 생성

Spring Boot와 통합 환경을 구성하기 때문에 @Configuration, @Bean을 주입 하여 서버를 구성하는데, 객체의 Life cycle 관리는 Spring이 담당하고, RPC 서버 역할은 Armeria가 담당하게 됩니다.
Armeria 서버 구동 시 기본적인 Logging 설정, 브라우저에서 테스트 가능한 환경을 위한 Docs 설정, gRPC 서비스 바인딩을 위한 최소한의 설정만 추가하였고, 추가적으로 Armeria에서 기본으로 제공하는 Health check 및 애플리케이션 모니터링을 위한 metrics, prometheus 설정을 바인딩 하였습니다.

ArmeriaConfiguration 구성 파일

@Configuration
class ArmeriaConfiguration(
    private val grpcServices: List<BindableService>,
) {

    @Bean
    fun armeriaServerConfigurator(): ArmeriaServerConfigurator = ArmeriaServerConfigurator {
            // access log 설정
            configureAccessLog(it)
            
            // request/response 로깅 설정
            configureLogging(it)
            
            // gRPC 서비스 바인딩을 위한 설정
            configureGrpcService(it)
            
            // armeria Docs 설정 
            configureDocService(it)
        }

    private fun configureAccessLog(sb: ServerBuilder) =
        sb.accessLogWriter(AccessLogWriter.combined(), true)

    private fun configureLogging(sb: ServerBuilder) {
        val logFormatter = LogFormatter.builderForText()
            .requestHeadersSanitizer { _: RequestContext, headers: HttpHeaders ->
                headers.toBuilder()
                    .removeAndThen(HttpHeaderNames.CONTENT_TYPE)
                    .build().toString()
            }
            .build()

        val logWriter = LogWriter.builder()
            .requestLogLevel(LogLevel.INFO)
            .successfulResponseLogLevel(LogLevel.INFO)
            .failureResponseLogLevel(LogLevel.ERROR)
            .logFormatter(logFormatter)
            .build()

        val loggingDecorator = LoggingService.builder()
            .logWriter(logWriter)
            .newDecorator()
        sb.decorator(loggingDecorator)
    }

    private fun configureDocService(sb: ServerBuilder) {
        val docServiceBuilder = DocService.builder()
            .exclude(DocServiceFilter.ofServiceName(HealthGrpc.SERVICE_NAME))
            .build()
        sb.serviceUnder("/docs", docServiceBuilder)
    }

    private fun configureGrpcService(sb: ServerBuilder) {
        val grpcServiceBuilder = GrpcService.builder()
            .apply {
                grpcServices.forEach { addService(it) }
            }
            .addService(ProtoReflectionService.newInstance())  // gRPC reflection service
            .supportedSerializationFormats(GrpcSerializationFormats.values())
            .enableHealthCheckService(true)
            .enableUnframedRequests(true)
            .enableHttpJsonTranscoding(true)  //HTTP/JSON to gRPC transcoding
            .build()
        sb.service(grpcServiceBuilder)
        sb.decorator(MetricCollectingService.newDecorator(GrpcMeterIdPrefixFunction.of("grpc.service")))
            .build()
    }
}

application.yml 파일 생성

서버 구성을 위한 Port 설정 및 서버 기동시 설정 그리고 Docs 및 Health check를 위한 internal-services을 추가합니다.

armeria:
  ports:
    - port: 2222
      protocols:
        - HTTP
  graceful-shutdown:
    quiet-period-millis: 2000
    timeout-millis: 3000
  request-timeout: 3000
  maxNumConnections: 500 #bytes (default: 1000)
  internal-services:
    port: 2223
    include : metrics, docs, health
    enable-metrics: true
    metrics-path: '/internal'
    health-check-path: '/internal/health'
    docs-path: '/internal/docs'
    prometheus:
      enabled: true
      path: '/internal/prometheus'

여기까지 진행 되셨다면, Armeria server 구동 시 아래와 같은 로깅 정보를 확인할 수 있습니다.

Armeria Docs http://127.0.0.1:2222/docs로 접속하면 아래와 같이 노출됩니다.

처음 Docs Page를 로딩하면 Services 탭은 바인딩 되지 않습니다.
위 스크린샷은 gRPC Stub 메서드를 구현한 후 configureDocService() 메서드에 serviceUnder를 
serverBuilder로 바인딩한 화면이니 참고하시기 바라며, 순서대로 진행하면 gRPC 및 Annotated Services를 Docs에서 확인할 수 있습니다.

private fun configureDocService(sb: ServerBuilder) {
    val docServiceBuilder = DocService.builder()
        .build()
    sb.serviceUnder("/docs", docServiceBuilder)
}

 

4. Protobuf 생성

실제 개발 환경에서는 protobuf를 git submodule로 구성하여 Server <> Client에서 동일한 message 규격으로 사용하여 gRPC Request/Response 처리를 진행하게 됩니다. Demo 환경에서는 동일 프로젝트에 구성하여 진행합니다.

HTTP/2 Protocol 기반의 gRPC Interface에서 Server <> Client 간의 통신을 하기 위해서는 message 계층인 protobuf는  반드시 필요합니다. 개인적인 생각으로는 gRPC가 REST 대비 빠른 성능과 높은 처리량의 이점도 있지만, Interface를 위해 protobuf를 추가로 개발하고 관리해야 하는 다소 번거로운 부분은 오히려 Microservice module 간의  Deep coupling이 발생하는 건 아닌지 모르겠네요...

protobuf 작성

// proto file 버전 명시
syntax = "proto3";

// java에서 import시 package 경로
package com.armeria.purchase.v1;

// empty 리턴을 위한 추가
import "google/protobuf/empty.proto";

// request/response parameter message
message GetPurchaseRequest {
  string purchase_no = 1;
  string customer_id = 2;
  string purchase_date = 3;
}

message GetPurchaseResponse {
  Purchase purchase = 1;
}

message ListPurchasesRequest {
  string purchase_no = 1;
  string customer_id = 2;
  string purchase_date = 3;
}

message ListPurchasesResponse {
  repeated Purchase purchases = 1;
  string has_next = 2;
}

message Purchase {
  string purchase_no = 1;
  string customer_id = 2;
  string customer_name = 3;
  int64 product_id = 4;
  string product_name = 5;
  int32 price = 6;
  int32 quantity = 7;
  string purchase_date = 8;
}

// grpc 호출 서비스 
service PurchaseService {
  rpc GetPurchase(GetPurchaseRequest) returns (GetPurchaseResponse) {}
  rpc ListPurchases(ListPurchasesRequest) returns (ListPurchasesResponse) {}
}

protobuf 작성이 완료되면 Gradle build를 통해 generateProto를 실행하여 Compile을 진행합니다. Project 탭에 Build folder를 클릭하면 generated 된 proto 파일을 확인할 수 있습니다.

이제 비즈니스 로직을 작성하는 과정만 남았고 Stub 메서드 구현이 완료되면 HTTP/2 기반의 gRPC server 테스트를 진행할 수 있습니다.

protobuf 작성 시 표준 네이밍 규칙 및 자세한 규약은 아래 링크에서 가이드를 확인하고 작성하시기 바랍니다.

Protocol Buffers 네이밍 규칙
https://cloud.google.com/apis/design/naming_convention?hl=ko

 

이름 지정 규칙  |  Cloud API  |  Google Cloud

의견 보내기 이름 지정 규칙 컬렉션을 사용해 정리하기 내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요. 다수의 API에서 오랜 시간이 지나더라도 일관적인 개발자 환경을 제공하려면 API

cloud.google.com

Protocol Buffers Language Guide
https://protobuf.dev/programming-guides/proto3/

 

Language Guide (proto 3)

Covers how to use the version 3 of Protocol Buffers in your project.

protobuf.dev

Introduction to gRPC
https://grpc.io/docs/what-is-grpc/introduction/

 

Introduction to gRPC

An introduction to gRPC and protocol buffers.

grpc.io

 

5. Service Layer 구성

RPC 통신을 위해 작성한 protobuf를 연동하기 위한 서비스 레이어를 작성합니다.

 

6. Business Logic 구성

RPC Service interface implement

PurchaseServiceGrpc로 Compile 된 Class의 Stub 객체를 Kotlin suspend func을 사용하여 Service 레이어로 상속 받아 메서드를 구현합니다. 코드 순서를 보면 GetPurchaseRequest message로부터 요청되고 Component를 통해 getPurchase() 메서드가 호출됩니다. 이후 findeOneByPurchaseNo() 메서드가 호출되고 응답받은 객체는 다음과 같은 과정으로 변환되어 (Domain -> DTO -> GetPurchaseResponse) Builder pattern으로 add하여 Return 됩니다.
이번 Demo에서는 Armeria server 구성을 중점적으로 만들어 봤기 때문에 Client 역할은 RPC 테스트 환경인 Postman or Docs를 활용하여 테스트를 진행하였고, Client에서 Stub 객체의 getPurchase() 메서드를 호출하는 Client demo는 추후에 구성해보고자 합니다. 

PurchaseGrpcService
PurchaseServiceGrpcKt를 상속받아 suspend func으로 getPurchase(), listPurchases() 메서드를 override 합니다.

@Service // spring service layer 등록
@GrpcExceptionHandler(GlobalGrpcExceptionHandler::class) // armeria grpc 예외 처리를 위한 추가
class PurchaseGrpcService(private val purchaseComponent: PurchaseComponent): PurchaseServiceGrpcKt.PurchaseServiceCoroutineImplBase() {

    override suspend fun getPurchase(request: PurchaseOuterClass.GetPurchaseRequest): PurchaseOuterClass.GetPurchaseResponse {
        val purchase: PurchaseOuterClass.Purchase? = purchaseComponent.getPurchase(request.purchaseNo)?.let {
            toPurchaseProto(it)
        }

        return PurchaseOuterClass.GetPurchaseResponse.newBuilder()
            .setPurchase(purchase)
            .build()
    }

    override suspend fun listPurchases(request: PurchaseOuterClass.ListPurchasesRequest): PurchaseOuterClass.ListPurchasesResponse {
        val purchaseResponse: PurchaseResponse = purchaseComponent.listPurchases(request.purchaseNo)

        val purchases: List<PurchaseOuterClass.Purchase> = purchaseResponse.contents
            .mapNotNull { toPurchaseProto(it) }

        return ListPurchasesResponse.newBuilder()
            .addAllPurchases(purchases)
            .setHasNext(purchaseResponse.hasNext)
            .build()
    }
}

 

PurchaseComponent
Repository 호출 후 Return 된 응답 객체를 toPurchaseDTO() 메서드를 통하여 객체 간 맵핑으로 변환합니다.

@Component
class PurchaseComponent(private val purchaseRepository: PurchaseRepository) {
    suspend fun getPurchase(purchaseNo: String): PurchaseDTO {
        // toPurchaseDTO 메서드를 통해 domain -> dto 변환 처리
        return toPurchaseDTO(purchaseRepository.findOneByPurchaseNo(purchaseNo))
    }

    suspend fun listPurchases(purchaseNo: String): PurchaseResponse {
        return purchaseRepository.findPurchaseAll(purchaseNo).map { toPurchaseDTO(it) }
            .let {
                PurchaseResponse(
                    contents = it.content,
                    hasNext = if(it.hasNext()) "Y" else "N",
                )
            }
    }
}

PurchaseRepository
실제 mongodb 연동까지 구성하지는 않았고, JSON 파일 생성 후 Resource로 응답 결과를 받아 처리되도록 구현하였습니다. Kotlin serialization을 사용하여 JSON -> Purchase domain object로 역직렬화를 하였는데 Data Class에 @Serializable 애노테이션만 추가해 주면 쉽게 직렬화/역직렬화를 구현할 수 있습니다.

@Repository
class PurchaseRepository(
    private val resourceLoader: ResourceLoader
) {
    suspend fun findOneByPurchaseNo(purchaseNo: String): Purchase {
        val resource = resourceLoader.getResource("classpath:dummy/purchase-dummy.json")
        val jsonContent = resource.inputStream.bufferedReader().use { it.readText() }
        
        //kotlin serialization 메서드로 json -> Purchase 객체 역직렬화
        return Json.decodeFromString(jsonContent)
    }

    suspend fun findPurchaseAll(purchseNo: String): Slice<Purchase> {
        val pageable: Pageable = PageRequest.of(1, 10, Sort.Direction.DESC, "purchaseDate")
        val resource = resourceLoader.getResource("classpath:dummy/purchases-dummy.json")
        val jsonContent = resource.inputStream.bufferedReader().use { it.readText() }
        val purchases: List<Purchase> = Json.decodeFromString(jsonContent)

        val hasNext = purchases.size > 10
        val newPurchases = if (hasNext) {
            purchases.dropLast(1)
        } else {
            purchases
        }

        return SliceImpl(newPurchases, pageable, hasNext)
    }
}

Kotlin serialization
https://kotlinlang.org/docs/serialization.html

 

Serialization | Kotlin

 

kotlinlang.org

 

7. gRPC Server Test

먼저 gRPC의 경우 HTTP/2 protocol 기반의 테스트 환경에서 가능하기에 일반 브라우저에서는 테스트 진행이 불가능하고 gRPC reflection service를 사용하여 가능합니다.
다양한 방법 중 본인의 취향에 맞는 한 가지를 선택하면 되고, 참고로 Postman 과 같은 Client 역할을 대신하여 gRPC Stub service를 테스트를 할 경우, 아래 설정의 ProtoReflectionService.newInstance()를 ServerBuilder에 바인딩해야 실제 Client module이 아닌 Postman을 활용하여 테스트를 수행할 수 있습니다.

private fun configureGrpcService(sb: ServerBuilder) {
        val grpcServiceBuilder = GrpcService.builder()
            .apply {
                grpcServices.forEach { addService(it) }
            }
            .addService(ProtoReflectionService.newInstance())  // gRPC reflection service
            .enableUnframedRequests(true)
            .build()
        sb.service(grpcServiceBuilder)
            .build()
    }
  • Armeia Docs
  • Postman
  • Bloomrpc
  • intelliJ idea

Armeria Docs Test
Armeria를 사용하면 기본적으로 사용할 수 있는 막강한 Docs에서 Debuging을 수행할 수 있습니다.

Postman gRPC support
Server reflection을 클릭하면 proto에 작성한 Service 목록이 바인딩되고, message에 요청값을  입력 후 invoke를 통하여 RPC 통신이 수행되고 응답 결과를 확인할 수 있습니다.

Postman gRPC support
https://blog.postman.com/latest-advancements-to-postmans-grpc-support/

 

Latest advancements to Postman’s gRPC support | Postman Blog

See how Postman engineers have upgraded our gRPC support. New features are bringing out the best in what gRPC has to offer.

blog.postman.com

 

결론

지금까지 Kotlin + Armeria + gRPC (with. Spring Boot) 기반의 애플리케이션 Demo 버전을 만들어봤습니다. 고성능의 RPC 지원을 위한 Microservice framework가 필요하다면 Armeria를 한 번 검토해 보시는 것도 좋을 것 같습니다. 작은 서비스로 시작해 다양한 프로토콜에 대한 확장까지 이 모든 걸 단일 Server, 단일 port로 처리할 수 있는 막강한 framework라는 생각이 듭니다. 

Demo version은 아래 github link에서 Clone 하시기 바랍니다.

github

 

GitHub - zinzzas/kotlin-armeria-grpc: kotlin + armeria + grpc

kotlin + armeria + grpc. Contribute to zinzzas/kotlin-armeria-grpc development by creating an account on GitHub.

github.com

 

반응형
반응형

Stream anyMatch() vs filter.count > 0 성능 비교

개요

Java에서 스트림으로 작업할 때 조건자 or 기준과 일치하는 항목이 하나 이상 있는지 확인하는 로직을 작성할 때가 있습니다.
anyMatch()와 filter().count > 0 Stream API는 동일한 처리를 하지만 anyMatch()를 사용하는 것이 더 안전하고 성능상에 장점이 있는데 그 차이점에 대해 알아보고자 합니다.

차이점

가장 큰 차이점은 anyMatch()의 경우 Stream 요소에 일치하는 첫번째 항목이 존재하면, Stream을 중지하고 결과를 반환하게 됩니다.
반면에 count()의 경우는 모든 Stream 요소를 순회하고 그 결과인 count를 반환하게 됩니다.

Stream 요소가 작을 경우는 큰 차이가 없겠지만, Stream의 요소가 크면 클수록 성능상에 영향을 미칠 수 있습니다.

Code

일치하는 요소가 존재하면 Stream 종료
return component.getContents()
                .stream()
                .anyMatch(x -> "v1000".contains(x.getContentsCode())) ? "Y" : "N";
Contents Size까지 스트림 순회하고 count 반환
return component.getContents()
                .stream()
                .filter(x -> "v1000".contains(x.getContentsCode()))
                .count() > 0 ? "Y" : "N";
반응형
반응형
MongoDB find Query 예외

오늘은 MongoDB find query를 사용하면서 예상치 못한 예외에 직면했던 내용을 공유하려고 합니다.
먼저 Toy project 구성한 환경입니다.

  1. Spring Boot
  2. Webflux
  3. ReactiveMongodb
  4. Kotlin

The Probloem

Caused by: org.springframework.data.mongodb.UncategorizedMongoDbException: Query failed with error code 2 and error message 'Field 'locale' is invalid in: { locale: "order" }' on server localhost:27017; nested exception is com.mongodb.MongoQueryException: Query failed with error code 2 and error message 'Field 'locale' is invalid in: { locale: "order" }' on server localhost:27017

Order collection에서 locale field?? 제가 사용하는 field는 아닌데?? 정말이지.. 원인을 알 수가 없었고.. 너무 생뚱맞은 예외 메시지입니다. 원인은 바로... 아래 그림을 보시면.. Data Class에 MongoDB document를 선언했는데, 자동 완성 기능을 사용하다보니 바로.. 오타가 들어갔습니다.
Collection을 Collation으로 Collation는 MongoDB에서 데이터 정렬에 사용하는 예약어 입니다........
정말 어처구니없는 실수로... 2시간을 날려버린 거 같네요..

The Soultion

오타를 막기 위한 해결책 몇 가지 억지로 적어봅니다.

  1. 별칭 없이 작성하기..
    @Document("order")
    data class Order
  2. @Document 자체를 생략해도.. data class name을 가지고 Document 유추가 가능하기에.. 좀 억지 같네요..ㅎㅎ
    data class Order

그럼에도 가장 좋은 해결책은 오타가 발생하지 않도록 자동완성 사용 시 항상 주의를 기울이는 습관이 필요할 거 같습니다.

오늘은 오타로 인해 고생했던 내용을 공유하였습니다.

반응형
반응형
반응형

+ Recent posts

반응형