경매 입찰 & 작품 구매 동시성 문제 해결 3 - Redis Lock
개요
앞선 포스팅에서 Pessimistic Write Lock & MySQL Named Lock을 통해서 경매 입찰 & 작품 구매에 대한 동시성 문제를 해결하였다
Pessimistic Write Lock의 DB Record Lock으로 인해 다른 로직에 영향을 주는 것을 해결하고자 MySQL Named Lock을 적용해보았고 그 결과는 성공적이였다
하지만 MySQL Named Lock을 통해서 제어할 경우 아래와 같은 요소들을 적절히 고려해야 한다
- 적절한 DBCP 사이즈는 어떻게 결정할지
- Lock과 관련된 Connection을 어떻게 기존 로직과 분리해서 처리할지
- Lock을 얻고난 후
반드시 명시적 해제
를 적용하였는지 - …
휴먼 에러가 발생할 가능성이 존재하고 값비싼 DB Connection에 대한 제어를 잘못하면 전체 서비스 성능에 영향을 미칠 수 있기 때문에 더 효율적인 방안이 없나 고민을 할 필요가 있어보인다
고민을 하다가 현재 프로젝트에서는 인증번호 TTL을 관리하기 위해서 Redis를 활용하고 있다
따라서 MySQL Named Lock의 접근 방법과 동일하게 외부 어디선가 Lock을 관리
해야 하고 이 영역을 Redis로 생각하고 구현해보자
Redis를 활용한 동시성 제어 1) Lettuce
1
implementation("org.springframework.boot:spring-boot-starter-data-redis")
spring-boot-starter-data-redis의 기본 Client로 제공되는 Lettuce
는 특정 명령어를 통해서 Lock이라는 개념을 관리할 수 있다
결국 Thread가 Lock을 얻고 핵심 로직에 진입하기 위해서는 아래 2가지 절차를 Atomic
하게 진행해야 한다
- Lock이 존재하는지 확인 (다른 쓰레드가 이미 점유하고 있는지)
- Lock이 프리하면 획득하고 진입
이 2가지 연산을 Atomic하게 처리하기 위해서 활용하는 명령어는
setnx(set if not exists)
이다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
@RequiredArgsConstructor
public class BidFacade {
private static final String ACQUIRE = "1";
private final StringRedisTemplate redisTemplate;
private final BidUseCase target;
public void invoke(final BidCommand command) throws InterruptedException {
final String key = "AUCTION:" + command.auctionId();
while (!tryLock(key)) {
Thread.sleep(100); // Redis에 가하는 Spin Lock 부하를 줄이기 위해서 잠시 Sleep
}
try {
target.invoke(command);
} finally {
unlock(key);
}
}
private boolean tryLock(final String key) {
return redisTemplate.opsForValue().setIfAbsent(key, ACQUIRE);
}
private void unlock(final String key) {
redisTemplate.delete(key);
}
}
- 동시성 제어에 성공하였다
하지만 Lettuce를 활용한 위와 같은 Lock 제어 방식은 아래 2가지 단점이 존재한다
1. 무한 루프 대기
위의 코드를 보면 while(!tryLock(key))
를 통해서 lock을 획득하려고 시도한다
그런데 만약 특정 Thread가 lock을 획득하는데 성공하고 비즈니스 로직을 진행하던 도중 Application의 의도치 않은 오류 때문에 lock을 해제하지 못했다고 가정
하자
이렇게 된 순간 해당 Thread가 아닌 나머지 Thread들은 영원히 Lock을 얻지 못하고 무한 루프에 빠지게 되는 것이다
이 경우는 maxRetry 정책을 직접 구현해주면 어느정도 해결이 될듯하다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static final int MAX_RETRY = 3; // 최대 재시도 횟수
public void invoke(final BidCommand command) throws InterruptedException {
final String key = "AUCTION:" + command.auctionId();
int retry = 0;
while (!tryLock(key)) {
if (++retry == MAX_RETRY) { // 최대 횟수 확인
throw new RuntimeException();
}
Thread.sleep(100); // Redis에 가하는 Spin Lock 부하를 줄이기 위해서 잠시 Sleep
}
try {
target.invoke(command);
} finally {
unlock(key);
}
}
2. Spin Lock
무한 루프에 빠지게 되는 위험은 최대 재시도 횟수 제한을 적용함으로써 어느정도 커버가 가능하다
하지만 근본적으로 위와 같은 구현을 하게 되면 Lock을 획득할때까지 Redis에 가해지는 지속적인 부하
가 Redis 서버에 부담이 될 수 있다는 것이다
위의 코드는 이러한 부하를 약간은 방지하기 위해서 Thread.sleep(100)
을 통해서 재시도를 하기 위한 텀을 제공하였다
그래도 이 텀 간격은 언제든지 수정될 수 있고 요청이 많을수록, 작업이 오래 걸릴수록
이러한 부하는 점점 Redis 서버에 큰 부담이 될 것이다
Redis를 활용한 동시성 제어 2) Redisson
1
implementation("org.redisson:redisson-spring-boot-starter:${property("redissonVersion")}")
Lettuce와 비슷하게 Redisson도 Netty를 활용한 Non-Blocking I/O 메커니즘이다
하지만 Lettuce와 다른 점은 Lock을 관리하기 위한 구현체 자체를 제공
해준다는 점이다
Redisson은 위에서 지적한 Lettuce의 2가지 단점 (무한 대기, Spin Lock)을 어떻게 해결함으로써 더 효율적으로 동작하는지 확인해보자
1. 무한 대기? Timeout
RedissonLock은 획득 시점에 다음과 같은 필드 정보를 추적한다
- waitTime = Lock을 획득할때까지 대기하는 시간
- leaseTime = Lock이 만료되는 시간
Lock을 얻는 시점에서부터 이러한 대기 & 만료 시간을 함께 관리하기 때문에 Thread가 Lock을 얻기 위해서 무한 대기하는 상황이 발생하지 않는다
2. Spin Lock? pub/sub
lock을 얻는 프로세스를 따라가보면 subscribe
가 보인다
CompletableFuture
를 활용해서 비동기적으로 구독을 진행한다- 이후 구독한 채널에 Lock Release와 관련된 메시지가 도달하면 그 시점에 Lock을 얻으려는 시도를 다시 진행한다
- waitTime을 고려해서 timeout이 발생하면 예외를 발생시키고 구독하고 있는 Channel에 대한 구독을 취소한다
Redisson의 pub/sub 기반 Lock을 획득하는 프로세스는 다음과 같다
- tryLock을 통해서 대기 없이 Lock을 요구할 수 있고 다른 Thread와 경합이 없다면 그대로 Lock을 획득
- Lock을 획득하지 못했으면 이후
subscribe한 channel에서 "Lock이 해제되었습니다"라는 메시지
를 받으면 획득 재시도 - waitTime이 끝날때까지 Lock을 획득하지 못하면 return false
이러한 pub/sub 기반 Lock 요청 방식으로 인해 이전 Lettuce에서 보았던 Spin Lock보다 훨씬 Redis 서버에 부하를 덜 주게 된다
- 또한 Lock을 획득하려는 시도는 루아 스크립트를 통해서 Atomic하게 연산된다
RedissonLock 적용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Component
@RequiredArgsConstructor
public class BidFacade {
private final RedissonClient redissonClient;
private final BidUseCase target;
public void invoke(final BidCommand command) throws InterruptedException {
final String key = "AUCTION:" + command.auctionId();
final RLock lock = redissonClient.getLock(key);
try {
if (!tryLock(lock)) {
return; // 획득 못하면 return -> 재시도 정책 추가 가능
}
target.invoke(command);
} finally {
unlock(lock);
}
}
private boolean tryLock(final RLock lock) throws InterruptedException {
return lock.tryLock(5, 1, TimeUnit.SECONDS);
}
private void unlock(final RLock lock) {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
} else {
throw new RuntimeException("anonymous try unlock or timeout...");
}
}
}
재사용을 위한 AOP 메커니즘 활용
Redisson을 활용한 동시성 제어 로직은 경매 입찰
뿐만 아니라 작품 구매
에서도 활용할 예정이다
따라서 재사용성
을 높이기 위해서 애노테이션 + AOP 메커니즘
을 적용하는 것이 낫다고 판단된다
@DistributedLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
String keyPrefix();
String keySuffix();
long waitTime() default 5000L;
long leaseTime() default 3000L;
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
boolean withInTransaction() default false;
}
keyPrefix & keySuffix로 나눈 이유는 물론 key하나로 SpEL을 통해서 문자열을 완성시킬 수 있지만 내부 문자열 구조자체가 깔끔하지 않다고 판단해서 분리하였다
- keyPrefix = 고정적인 값
- keySuffix = 메소드에 넘어온 인자를 활용한 가변적인 값
ExpressionParser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DistributedLockNameGenerator {
private static final ExpressionParser expressionParser = new SpelExpressionParser();
public static Object generate(
final String prefix,
final String key,
final String[] parameterNames,
final Object[] args
) {
final StandardEvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < parameterNames.length; i++) {
context.setVariable(parameterNames[i], args[i]);
}
return prefix + parseKey(key, context);
}
private static Object parseKey(final String key, final StandardEvaluationContext context) {
return expressionParser.parseExpression(key).getValue(context, Object.class);
}
}
- Key Prefix로 들어가는 값은 동적으로 적용되는 auctionId이기 때문에 이를 파싱하기 위한 커스텀한 Parser를 구현할 필요가 있다
AOP 로직
1
2
3
4
5
6
7
@Component
public class AopWithTransactional {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Object proceed(final ProceedingJoinPoint joinPoint) throws Throwable {
return joinPoint.proceed();
}
}
- target 로직을 Transaction Scope에서 처리하기 위한 Helper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class DistributedLockAop {
private final RedissonClient redissonClient;
private final AopWithTransactional aopWithTransactional;
@Around("@annotation(DistributedLock)")
public Object lock(final ProceedingJoinPoint joinPoint) {
final MethodSignature signature = (MethodSignature) joinPoint.getSignature();
final Method method = signature.getMethod();
final DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
final String key = (String) DistributedLockNameGenerator.generate(
distributedLock.keyPrefix(),
distributedLock.keySuffix(),
signature.getParameterNames(),
joinPoint.getArgs()
);
final RLock lock = redissonClient.getLock(key);
try {
acquireLock(lock, distributedLock);
log.info(
"Thread[{}] -> [{}] lock acquired with in transaction = {}",
Thread.currentThread().getName(),
lock.getName(),
distributedLock.withInTransaction()
);
if (distributedLock.withInTransaction()) {
return aopWithTransactional.proceed(joinPoint);
}
return joinPoint.proceed();
} catch (final InterruptedException e) {
throw new RuntimeException("Interrupt occurred when acquire lock...", e);
} catch (final Throwable e) {
throw new RuntimeException(e);
} finally {
release(lock);
}
}
private void acquireLock(final RLock lock, final DistributedLock distributedLock) throws InterruptedException {
if (!tryLock(lock, distributedLock)) {
throw new RuntimeException("Failed to acquire lock...");
}
}
private boolean tryLock(final RLock lock, final DistributedLock distributedLock) throws InterruptedException {
return lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
}
private void release(final RLock lock) {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
try {
lock.unlock();
log.info("Thread[{}] -> [{}] lock released", Thread.currentThread().getName(), lock.getName());
} catch (final Throwable e) {
log.error("Failed to release lock", e);
}
} else {
log.error(
"[{}] Alreay unlock or timeout... -> isLocked = {} || isHeldByCurrentThread = {}",
Thread.currentThread().getName(),
lock.isLocked(),
lock.isHeldByCurrentThread()
);
throw new RuntimeException("anonymous try unlock or timeout...");
}
}
}
최종 적용
1) 입찰 로직
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@UseCase
@RequiredArgsConstructor
public class BidUseCase {
private final AuctionReader auctionReader;
private final ArtReader artReader;
private final MemberReader memberReader;
private final BidInspector bidInspector;
private final BidProcessor bidProcessor;
@DistributedLock(
keyPrefix = "AUCTION:",
keySuffix = "#command.auctionId",
withInTransaction = true
)
public void invoke(final BidCommand command) {
final Auction auction = auctionReader.getById(command.auctionId());
final Art art = artReader.getById(auction.getArtId());
final Member bidder = memberReader.getById(command.memberId());
bidInspector.checkBidCanBeProceed(auction, art, bidder, command.bidPrice());
bidProcessor.execute(auction, bidder, command.bidPrice());
}
}
2) 구매 로직
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@UseCase
@RequiredArgsConstructor
public class PurchaseArtUseCase {
private final ArtReader artReader;
private final MemberReader memberReader;
private final PurchaseProcessor purchaseProcessor;
@DistributedLock(
keyPrefix = "ART:",
keySuffix = "#command.artId",
withInTransaction = true
)
public void invoke(final PurchaseArtCommand command) {
final Art art = artReader.getById(command.artId());
final Member owner = memberReader.getById(art.getOwnerId());
final Member buyer = memberReader.getById(command.memberId());
if (art.isAuctionType()) {
purchaseProcessor.purchaseAuctionArt(art, owner, buyer);
} else {
purchaseProcessor.purchaseGeneralArt(art, owner, buyer);
}
}
}
@Service
@RequiredArgsConstructor
public class PurchaseProcessor {
private final AuctionReader auctionReader;
private final PurchaseInspector purchaseInspector;
private final PurchaseWriter purchaseWriter;
private final PointRecordWriter pointRecordWriter;
private final AssociatedPointTransactionProcessor associatedPointTransactionProcessor;
@AnotherArtWritableTransactional
public void purchaseAuctionArt(
final Art art,
final Member owner,
final Member buyer
) {
final Auction auction = auctionReader.getByArtId(art.getId());
purchaseInspector.checkAuctionArt(auction, art, buyer);
art.closeSale();
final Purchase purchase = Purchase.purchaseAuctionArt(art, buyer, auction.getHighestBidPrice());
proceedPurchase(purchase, owner, buyer);
associatedPointTransactionProcessor.executeWithPurchaseAuction(owner, buyer, purchase.getPrice());
}
@AnotherArtWritableTransactional
public void purchaseGeneralArt(
final Art art,
final Member owner,
final Member buyer
) {
purchaseInspector.checkGeneralArt(art, buyer);
art.closeSale();
final Purchase purchase = Purchase.purchaseGeneralArt(art, buyer);
proceedPurchase(purchase, owner, buyer);
associatedPointTransactionProcessor.executeWithPurchaseGeneral(owner, buyer, purchase.getPrice());
}
private void proceedPurchase(
final Purchase purchase,
final Member owner,
final Member buyer
) {
purchaseWriter.save(purchase);
pointRecordWriter.save(
PointRecord.addArtSoldRecord(owner, purchase.getPrice()),
PointRecord.addArtPurchaseRecord(buyer, purchase.getPrice())
);
}
}
@Service
@RequiredArgsConstructor
public class AssociatedPointTransactionProcessor {
@AnotherArtWritableTransactional
public void executeWithPurchaseAuction(
final Member owner,
final Member buyer,
final int price
) {
// 1. 구매자 포인트 차감
buyer.increaseAvailablePoint(price); // 입찰 시 소모한 포인트 누적 차감 문제 해결
buyer.decreaseTotalPoint(price);
// 2. 판매자 포인트 적립
owner.increaseTotalPoint(price);
}
@AnotherArtWritableTransactional
public void executeWithPurchaseGeneral(
final Member owner,
final Member buyer,
final int price
) {
// 1. 구매자 포인트 차감
buyer.decreaseTotalPoint(price);
// 2. 판매자 포인트 적립
owner.increaseTotalPoint(price);
}
}
Redis 분산락 Timeout & Optimistic Lock
한 스텝 더 나아가서 다음과 같은 케이스를 생각해보자
Redis 분산락은 Timeout
이 발생했지만Application Transaction은 여전히 유효
하다면?
이러한 케이스에서는 다시 동시성 문제가 발생할 수 있기 때문에 위와 같은 문제를 방지하기 위해서 2차적인 방어 로직으로 Optimistic Lock
을 활용해서 데이터 정합성을 보장하는 것이 좋아보인다
CAS(Compare & Set) 연산 - OptimisticLocking
을 통해서 여러 트랜잭션에서의 데이터 정합성을 보장하는 메커니즘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Getter
@NoArgsConstructor(access = PROTECTED)
@Entity
@Table(name = "auction")
public class Auction extends BaseEntity<Auction> {
@Column(name = "art_id", nullable = false, updatable = false, unique = true)
private Long artId;
@Embedded
private Period period;
@Column(name = "highest_bidder_id")
private Long highestBidderId;
@Column(name = "highest_bid_price", nullable = false)
private int highestBidPrice;
@Version
private long version;
@OneToMany(mappedBy = "auction", cascade = CascadeType.PERSIST)
private final List<AuctionRecord> auctionRecords = new ArrayList<>();
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
String keyPrefix();
String keySuffix();
long waitTime() default 5000L;
long leaseTime() default 3000L;
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
boolean withInTransaction() default false;
/**
* 분산락 Timeout을 고려한 Optimistic Lock Retry
*/
int withRetry() default -1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class DistributedLockAop {
private final RedissonClient redissonClient;
private final AopWithTransactional aopWithTransactional;
@Around("@annotation(DistributedLock)")
public Object lock(final ProceedingJoinPoint joinPoint) {
final MethodSignature signature = (MethodSignature) joinPoint.getSignature();
final Method method = signature.getMethod();
final DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
final String key = (String) DistributedLockNameGenerator.generate(
distributedLock.keyPrefix(),
distributedLock.keySuffix(),
signature.getParameterNames(),
joinPoint.getArgs()
);
final RLock lock = redissonClient.getLock(key);
int currentRetry = 0;
while (true) {
try {
if (tryMaximum(currentRetry++, distributedLock, method)) {
throw new RuntimeException("Retry Exception...");
}
acquireLock(lock, distributedLock);
log.info(
"Thread[{}] -> [{}] lock acquired with in transaction = {}",
Thread.currentThread().getName(),
lock.getName(),
distributedLock.withInTransaction()
);
if (distributedLock.withInTransaction()) {
return aopWithTransactional.proceed(joinPoint);
}
return joinPoint.proceed();
} catch (final ObjectOptimisticLockingFailureException e) {
log.info(
"[{}] Optimistic Lock Version Miss... -> retry = {}, maxRetry = {}, withInTransaction = {}",
Thread.currentThread().getName(),
currentRetry,
distributedLock.withRetry(),
distributedLock.withInTransaction()
);
try {
Thread.sleep(100);
} catch (final InterruptedException ex) {
throw new RuntimeException(ex);
}
} catch (final InterruptedException e) {
throw new RuntimeException("Interrupt occurred when acquire lock...", e);
} catch (final Throwable e) {
throw new RuntimeException(e);
} finally {
release(lock);
}
}
}
private boolean tryMaximum(final int currentRetry, final DistributedLock distributedLock, final Method method) {
if (distributedLock.withRetry() != -1 && distributedLock.withRetry() == currentRetry) {
log.error("Retry Exception... -> method = {}", method);
return true;
}
return false;
}
private void acquireLock(final RLock lock, final DistributedLock distributedLock) throws InterruptedException {
if (!tryLock(lock, distributedLock)) {
throw new RuntimeException("Failed to acquire lock...");
}
}
private boolean tryLock(final RLock lock, final DistributedLock distributedLock) throws InterruptedException {
return lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
}
private void release(final RLock lock) {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
try {
lock.unlock();
log.info("Thread[{}] -> [{}] lock released", Thread.currentThread().getName(), lock.getName());
} catch (final Throwable e) {
log.error("Failed to release lock", e);
}
} else {
log.error(
"[{}] Alreay unlock or timeout... -> isLocked = {} || isHeldByCurrentThread = {}",
Thread.currentThread().getName(),
lock.isLocked(),
lock.isHeldByCurrentThread()
);
throw new RuntimeException("anonymous try unlock or timeout...");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@UseCase
@RequiredArgsConstructor
public class BidUseCase {
private final AuctionReader auctionReader;
private final ArtReader artReader;
private final MemberReader memberReader;
private final BidInspector bidInspector;
private final BidProcessor bidProcessor;
@DistributedLock(
keyPrefix = "AUCTION:",
keySuffix = "#command.auctionId",
withInTransaction = true,
withRetry = 3
)
public void invoke(final BidCommand command) {
final Auction auction = auctionReader.getById(command.auctionId());
final Art art = artReader.getById(auction.getArtId());
final Member bidder = memberReader.getById(command.memberId());
bidInspector.checkBidCanBeProceed(auction, art, bidder, command.bidPrice());
bidProcessor.execute(auction, bidder, command.bidPrice());
}
}