ν€μ΄μ΅ κ΄λ¦¬ μμ€ν μμ κ³ κ°λ€μκ² λ¨μ²΄ λ©μμ§λ₯Ό μ μ‘νλ κΈ°λ₯μ κ°λ°νλ μ€, κ³ λ―Όν λΆλΆμ΄ μκ²Όμ΅λλ€.
// β λ¬Έμ κ° μλ κΈ°μ‘΄ λ°©μ
for(ShopMessageHistory history : historyList){
processMessageAsync(history, request, taskId); // κ°κ° μ μ€λ λ μμ±
}
λ¬Έμ μ :
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "messageTaskExecutor")
public TaskExecutor messageTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3); // κΈ°λ³Έ μ€λ λ μ
executor.setMaxPoolSize(10); // μ΅λ μ€λ λ μ
executor.setQueueCapacity(100); // λκΈ° ν ν¬κΈ°
executor.setThreadNamePrefix("Message-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
@Service
public class MessageService {
@Async("messageTaskExecutor")
public void processAllMessagesAsync(List<ShopMessageHistory> historyList,
MessageDTO request,
String taskId,
Long batchId) {
log.info("π¦ [{}] μ 체 λ©μμ§ μ²λ¦¬ μμ - μ΄ {}건", taskId, historyList.size());
int successCount = 0;
int failCount = 0;
for(ShopMessageHistory history : historyList) {
try {
boolean success = sendSingleMessage(history, request);
if(success) {
successCount++;
updateMessageStatus(history, MessageStatus.SUCCESS);
} else {
failCount++;
updateMessageStatus(history, MessageStatus.FAILED);
}
// API μ ν λ°©μ§λ₯Ό μν λλ μ΄
if((successCount + failCount) % 10 == 0) {
Thread.sleep(200); // 10건λ§λ€ 0.2μ΄ λκΈ°
}
} catch (Exception e) {
failCount++;
log.error("[{}] λ©μμ§ μ μ‘ μ€ν¨ - UserCode: {}, Error: {}",
taskId, history.getUserCode(), e.getMessage());
updateMessageStatus(history, MessageStatus.FAILED);
}
}
// λ°°μΉ κ²°κ³Ό μ
λ°μ΄νΈ
updateBatchResult(batchId, successCount, failCount);
log.info("β
[{}] μ 체 μ²λ¦¬ μλ£ - μ±κ³΅: {}건, μ€ν¨: {}건", taskId, successCount, failCount);
}
private void updateBatchResult(Long batchId, int successCount, int failCount) {
messageSendBatchService.updateBatchResult(batchId, successCount, failCount);
}
}
public MessageResponse sendMessageAsync(MessageDTO request) {
String taskId = generateTaskId();
log.info("π© [{}] λ©μμ§ μμ² μ μ - UserCode μ: {}", taskId, request.getTo().size());
// 1. batch DBμ μ μ₯
MessageSendBatchDTO batchDTO = MessageSendBatchDTO.builder()
.shopCode(request.getFrom())
.sendType(request.getSendType())
.subject(request.getSubject())
.totalCount(request.getTo().size())
.successCount(0)
.failCount(0)
.build();
MessageSendBatchDTO createdBatchDTO = messageSendBatchService.createMessageBatch(batchDTO);
// 2. history μ μ₯
List<ShopMessageHistory> historyList = new ArrayList<>();
for(Integer userCode : request.getTo()){
historyList.add(saveAsPending(userCode, request.getText(), createdBatchDTO));
}
// 3. PENDINGμΌλ‘ μλ΅
MessageResponse response = new MessageResponse(MessageStatus.PENDING.toString());
// 4. β
λ¨μΌ λΉλκΈ° μμ
μΌλ‘ λͺ¨λ λ©μμ§ μ²λ¦¬
processAllMessagesAsync(historyList, request, taskId, createdBatchDTO.getId());
log.info("β‘ [{}] PENDING μλ΅ μ¦μ λ°ν", taskId);
return response;
}
λ¬Έμ : λλ λ©μμ§ μ μ‘ μ λ©λͺ¨λ¦¬ λΆμ‘± μλ¬ λ°μ
// β λ¬Έμ κ° λλ μ½λ
List<ShopMessageHistory> historyList = new ArrayList<>();
for(int i = 0; i < 10000; i++) {
historyList.add(createHistory(userCodes.get(i)));
}