ElasticSearch를 단계적으로 도입합니다.
Docker Compose에 ES 추가

SpringBoot 의존성 설정
build.gradle

application.yml

common/config/ElasticsearchStartupVerifier.java
package com.marketengine.backend.common.config;
import java.io.IOException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.InfoResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Step 1: confirms the app can reach Elasticsearch at startup.
* Search features are added in later steps.
*/
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "marketengine.elasticsearch.verify-on-startup", havingValue = "true", matchIfMissing = true)
public class ElasticsearchStartupVerifier implements ApplicationListener<ApplicationReadyEvent> {
private final ElasticsearchClient elasticsearchClient;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
try {
InfoResponse info = elasticsearchClient.info();
log.info(
"Elasticsearch connected: cluster={}, version={}",
info.clusterName(),
info.version().number()
);
} catch (IOException exception) {
throw new IllegalStateException(
"Elasticsearch is not reachable at spring.elasticsearch.uris. "
+ "Start the cluster (e.g. docker compose up -d elasticsearch) "
+ "or set marketengine.elasticsearch.verify-on-startup=false.",
exception
);
}
}
}
클러스터 연결만 확인합니다.

백엔드 구동 - ElasticSearch 연결 확인했습니다.
이제 Document 모델을 구현합니다.
Document는 엘라스틱서치에 물리적으로 적재되는 Json 데이터입니다.
디렉토리: product/infrastructure/search
ProductDocument.java
package com.marketengine.backend.product.infrastructure.search;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Setting;
import com.marketengine.backend.product.domain.Product;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* Elasticsearch read model for product list/search (step 2: mapping only; no queries yet).
* Document id equals PostgreSQL {@link Product#getId()}.
*/
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Document(indexName = ProductDocument.INDEX_NAME)
@Setting(replicas = 0)
public class ProductDocument {
public static final String INDEX_NAME = "products";
@Id
private Long id;
@Field(type = FieldType.Text)
private String name;
@Field(type = FieldType.Double)
private BigDecimal priceAmount;
@Field(type = FieldType.Integer)
private int stockQuantity;
@Field(type = FieldType.Keyword)
private String category;
@Field(type = FieldType.Keyword)
private String brand;
@Field(type = FieldType.Keyword)
private String color;
@Field(type = FieldType.Keyword)
private String gender;
@Field(type = FieldType.Integer)
private int popularityScore;
@Field(type = FieldType.Date)
private OffsetDateTime createdAt;
public ProductDocument(
Long id,
String name,
BigDecimal priceAmount,
int stockQuantity,
String category,
String brand,
String color,
String gender,
int popularityScore,
OffsetDateTime createdAt
) {
this.id = id;
this.name = name;
this.priceAmount = priceAmount;
this.stockQuantity = stockQuantity;
this.category = category;
this.brand = brand;
this.color = color;
this.gender = gender;
this.popularityScore = popularityScore;
this.createdAt = createdAt;
}
public static ProductDocument from(Product product) {
return new ProductDocument(
product.getId(),
product.getName(),
product.getPriceAmount(),
product.getStockQuantity(),
product.getCategory().name(),
product.getBrand(),
product.getColor(),
product.getGender(),
product.getPopularityScore(),
product.getCreatedAt()
);
}
}
Document 정의
Index Name: products
ProductSearchConfig.java
package com.marketengine.backend.product.infrastructure.search;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.marketengine.backend.product.infrastructure.search")
public class ProductSearchConfig {
}
ES Repo 디렉토리 지정 config
ProductSearchRepository.java
package com.marketengine.backend.product.infrastructure.search;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
* Step 2: repository for index CRUD; search queries are added in later steps.
*/
public interface ProductSearchRepository extends ElasticsearchRepository<ProductDocument, Long> {
}
ProductIndexInitializer.java
package com.marketengine.backend.product.infrastructure.search;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Step 2: create the {@value ProductDocument#INDEX_NAME} index and mapping if missing.
*/
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "marketengine.elasticsearch.index.ensure-on-startup", havingValue = "true", matchIfMissing = true)
public class ProductIndexInitializer implements ApplicationListener<ApplicationReadyEvent> {
private final ElasticsearchOperations elasticsearchOperations;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
IndexOperations indexOperations = elasticsearchOperations.indexOps(ProductDocument.class);
if (indexOperations.exists()) {
log.info("Elasticsearch index already exists: {}", ProductDocument.INDEX_NAME);
return;
}
indexOperations.createWithMapping();
log.info("Elasticsearch index created: {}", ProductDocument.INDEX_NAME);
}
}
ProductDocument 기준으로 index 및 mapping 생성
RDBMS에 대응하면
Index - Table
Document - Row
Mapping - 스키마 정의 방식입니다.

elasticsearch - products 인덱스 생성 확인
이제 데이터를 ES에 적재합니다.
스프링부트에서 reindex 프로필을 설정해서 웹서버 없이 reindex만 하고 종료할겁니다.
reindex는 환경에 따라서 긴 시간이 소요될 수 있습니다.
application-reindex.yml
# Bulk reindex profile: no HTTP server, run once and exit.
spring:
main:
web-application-type: none
marketengine:
elasticsearch:
reindex:
batch-size: 5000
log-every-batches: 20
ProductBulkReindexer.java
package com.marketengine.backend.product.infrastructure.search;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.List;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Step 3: stream products from PostgreSQL and bulk-index into Elasticsearch.
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ProductBulkReindexer {
private static final String COUNT_SQL = "SELECT COUNT(*) FROM products";
private static final String BATCH_SQL = """
SELECT id, name, price_amount, stock_quantity, category, brand, color, gender,
popularity_score, created_at
FROM products
WHERE id > ?
ORDER BY id
LIMIT ?
""";
private static final RowMapper<ProductDocument> ROW_MAPPER = (resultSet, rowNum) -> new ProductDocument(
resultSet.getLong("id"),
resultSet.getString("name"),
resultSet.getBigDecimal("price_amount"),
resultSet.getInt("stock_quantity"),
resultSet.getString("category"),
resultSet.getString("brand"),
resultSet.getString("color"),
resultSet.getString("gender"),
resultSet.getInt("popularity_score"),
resultSet.getObject("created_at", OffsetDateTime.class)
);
private final JdbcTemplate jdbcTemplate;
private final ElasticsearchOperations elasticsearchOperations;
@Value("${marketengine.elasticsearch.reindex.batch-size:5000}")
private int batchSize;
@Value("${marketengine.elasticsearch.reindex.log-every-batches:20}")
private int logEveryBatches;
public ReindexResult reindex(boolean recreateIndex) {
Instant startedAt = Instant.now();
if (recreateIndex) {
recreateIndex();
}
long totalRows = jdbcTemplate.queryForObject(COUNT_SQL, Long.class);
log.info("Starting product reindex: totalRows={}, batchSize={}", totalRows, batchSize);
IndexCoordinates index = IndexCoordinates.of(ProductDocument.INDEX_NAME);
long lastId = 0L;
long indexed = 0L;
int batchNumber = 0;
while (true) {
List<ProductDocument> batch = jdbcTemplate.query(BATCH_SQL, ROW_MAPPER, lastId, batchSize);
if (batch.isEmpty()) {
break;
}
elasticsearchOperations.save(batch, index);
batchNumber++;
indexed += batch.size();
lastId = batch.get(batch.size() - 1).getId();
if (batchNumber % logEveryBatches == 0 || indexed >= totalRows) {
log.info("Reindex progress: indexed={}/{} ({}%)", indexed, totalRows, percent(indexed, totalRows));
}
}
elasticsearchOperations.indexOps(ProductDocument.class).refresh();
Duration duration = Duration.between(startedAt, Instant.now());
log.info("Product reindex finished: indexed={}, duration={}s", indexed, duration.toSeconds());
return new ReindexResult(indexed, duration);
}
private void recreateIndex() {
IndexOperations indexOperations = elasticsearchOperations.indexOps(ProductDocument.class);
if (indexOperations.exists()) {
indexOperations.delete();
log.info("Deleted Elasticsearch index: {}", ProductDocument.INDEX_NAME);
}
indexOperations.createWithMapping();
log.info("Created Elasticsearch index: {}", ProductDocument.INDEX_NAME);
}
private static int percent(long done, long total) {
if (total == 0L) {
return 100;
}
return (int) Math.min(100L, (done * 100L) / total);
}
public record ReindexResult(long indexedCount, Duration duration) {
}
}
JDBC RowMapper로 JDBC ResultSet -> ProductDocument 변환
(JPA로 읽는것보다, JDBC 배치로 읽는것이 훨씬 효율적)
배치크기 - 5000건
ElasticsearchOperations로 적재 요청을 보냅니다.
ProductReindexApplicationRunner.java
package com.marketengine.backend.product.infrastructure.search;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Step 3: run with {@code --spring.profiles.active=reindex} (see scripts/elasticsearch/run-reindex-products.ps1).
* Optional flag: {@code --recreate-index} deletes and recreates the products index before bulk load.
*/
@Slf4j
@Profile("reindex")
@Component
@RequiredArgsConstructor
public class ProductReindexApplicationRunner implements ApplicationRunner {
private final ProductBulkReindexer bulkReindexer;
private final ConfigurableApplicationContext applicationContext;
@Override
public void run(ApplicationArguments args) {
boolean recreateIndex = args.containsOption("recreate-index");
try {
ProductBulkReindexer.ReindexResult result = bulkReindexer.reindex(recreateIndex);
log.info(
"Reindex job complete: indexed={}, duration={}s",
result.indexedCount(),
result.duration().toSeconds()
);
} catch (RuntimeException exception) {
log.error("Reindex job failed", exception);
exit(1);
}
exit(0);
}
private void exit(int code) {
int exitCode = SpringApplication.exit(applicationContext, () -> code);
System.exit(exitCode);
}
}
Reindex 실행 ApplicationRunner입니다.
recreate 여부를 입력받아 BulkReindexer를 실행합니다
cd backend
# 이어서 적재 (기존 문서는 id 기준 upsert)
.\gradlew.bat bootRun --no-daemon --args="--spring.profiles.active=reindex"
# 인덱스 재생성 + 전량 적재 (첫 bulk / 매핑 바꾼 뒤)
.\gradlew.bat bootRun --no-daemon --args="--spring.profiles.active=reindex --recreate-index"

천만건 적재에 28분 21초 소요됐습니다.
flyway에서 생성한 데이터들은 ES에 추가되지 않습니다. 따라서 pg를 reset한다면, reindex 과정도 한번 필요합니다.
그 이후 서버 구동 중 개별 상품 CUD에 대해서 ES 자동 동기화 코드를 적용합니다.
ProductSearchIndexer.java
package com.marketengine.backend.product.infrastructure.search;
import org.springframework.stereotype.Component;
import com.marketengine.backend.product.domain.Product;
import com.marketengine.backend.product.domain.ProductRepository;
import lombok.RequiredArgsConstructor;
/**
* Step 4: keep Elasticsearch {@link ProductDocument} in sync with PostgreSQL on CUD.
* List/search API still uses PostgreSQL until step 5.
*/
@Component
@RequiredArgsConstructor
public class ProductSearchIndexer {
private final ProductSearchRepository productSearchRepository;
private final ProductRepository productRepository;
public void index(Product product) {
Product source = resolveProductForIndex(product);
productSearchRepository.save(ProductDocument.from(source));
}
public void delete(Long productId) {
productSearchRepository.deleteById(productId);
}
/**
* {@code created_at} is DB-generated ({@code insertable = false}); reload once after insert if needed.
*/
private Product resolveProductForIndex(Product product) {
if (product.getCreatedAt() != null || product.getId() == null) {
return product;
}
return productRepository.findById(product.getId()).orElse(product);
}
}
필요시 product reload 후 ES Repo에 요청 보냅니다.

service: save, update, delete -> flush -> indexer 코드 추가
이제 데이터 적재 및 CUD 동기화까지 완료했습니다.
검색을 postgreSQL -> ElasticSearch로 전환합니다.
'백엔드 엔지니어링 일지' 카테고리의 다른 글
| 마켓 백엔드 엔진 14 : k6 검색 로드 테스트 - 2 (0) | 2026.05.29 |
|---|---|
| 마켓 백엔드 엔진 13 : ElasticSearch - 2 / Redis 캐싱 (0) | 2026.05.28 |
| 마켓 백엔드 엔진 11 : k6 검색 로드 테스트 - 1 (0) | 2026.05.22 |
| 마켓 백엔드 엔진 10 : 키워드 조합/유사도 검색 - pg_trgm (0) | 2026.05.18 |
| 마켓 백엔드 엔진 9 : k6 부하 테스트 - Prometheus, Grafana 연동 (0) | 2026.05.13 |