ConcurrentHashmap 동시성 처리 방법
ConcurrentHashMap 이란?
- Java의 동시성을 지원하는 HashMap의 변형이다. 여러 스레드가 동시에 데이터에 접근하고 수정할 때 발생할 수 있는 문제를 해결하기 위해 설계된 자료구조이다. 기본적으로 Java 의 HashMap은 스레드에 안전하지 않기 때문에 여러 스레드가 동시에 접근할 경우 데이터 무결성이 깨질 수 있다.
- HashTable/Synchronized Map 은 전체 lock 을 걸어 사용하고 있기 때문에 thread-safe 한 면은 있지만 성능 오버헤드를 발생시킨다.
- 세그먼트라는 작은 해시 테이블 여러 개를 만들어서 관리한다.
위 그림을 보면 보다 명확하게 알 수 있다.
HashMap, Synchronized 는 synchronized 키워드로 HashMap 메서드를 래핑해서 사용하기 때문에 null key 나 여러 null value 값을 허용할 수 있다.
Hashtable, SynchronizedMap 다 Map 객체 전체에 대한 lock을 획득한 후 스레드 안정성을 제공하게 된다. thread-safe 하지만 성능을 좋지 않아 보입니다.
HashTable 과 SynchronizedMap 이 Thread-Safe 한 이유
- HashTable (put, get) 에 synchronized 키워드가 적용되어 있다.
- 단일락으로 구성되어 있어서 한 스레드가 작업을 수행하는 동안 다른 스레드는 대기해야 한다. 이로 인해 성능은 저하될 수 있다.
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return (V)e.value;
}
}
return null;
}
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}
addEntry(hash, key, value, index);
return null;
}
- SynchronizedMap 은 Collections.synchronizedMap(Map) 메소드를 통해 생성된다.
- 모든 메서드 호출 시 synchronized block 을 사용하여 락을 걸어준다.
Map<Integer, String> synchrMap = Collections.synchronizedMap(new HashMap<>());
// Synchronized block
synchronized (synchrMap) {
// Iterate synchronized map
for (Map.Entry<Integer, String> entry : synchrMap.entrySet()) {
// Print key : value
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
그럼 위 두 자료구조의 차이점은?
- SynchronizedMap은 특정 메서드 전체가 아닌 필요한 부분에만 락을 걸 수 있다. HashTable 보다 조금 더 세밀한 제어가 가능하다.
ConcurrentHashMap
- 멀티스레드 환경에서 안전하게 사용할 수 있는 해시 테이블 구조이다.
get() 은 동시성 문제를 일으키지는 않지만, 최신 데이터를 반드시 보장하지 않는다. 예를 들어, 한 스레드가 데이터를 쓰는 도중에 다른 스레드가 그 데이터를 읽으면 과거의 값을 읽을 수 있다.
이러한 문제는 목적에 따라 달라지게 되는데 데이터 일관성 보다는 성능 최적화에 중점을 두고 있기 때문인거 같다.
그럼에도 get이 동시성을 보장하는 이유는 내부적으로 volatile 변수를 사용해 메모리 가시성을 보장한다. volatile 키워드는 특정 필드에 대해 스레드 간의 메모리 가시성을 제공한다. ConcurrentHashMap 버킷들은 노드 배열을 사용하고, 이 배열에 대한 참조는 volatile 로 선언되어 있다. 이를 통해서 여러 스레드가 동시에 읽기를 할 때 최신 값을 읽을 수 있도록 보장된다.
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 해시값을 계산하여 해당 버킷에 접근
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & hash(key))) != null) {
if ((eh = e.hash) == hash(key)) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 버킷이 여러 개의 노드를 가지고 있을 때 연결 리스트를 순회
if ((p = e.next) != null) {
do {
if ((eh = p.hash) == hash(key) &&
((ek = p.key) == key || (ek != null && key.equals(ek))))
return p.val;
} while ((p = p.next) != null);
}
}
return null;
}
*volatile 메모리 가시성 보장
각 CPU 내 캐시메모리에서 값을 옮기고 난 후 그 값으로 연산을 한 뒤 메인메모리에 반영을 하게 된다. 즉, 메모리 가시성이라는것은 스레드가 CPU 캐시 대신 메인 메모리에 값을 읽고 쓴다는 의미이다. 이를 통해 다른 스레드가 이 변수를 읽을 때 이전 스레드가 쓴 값을 읽을 수 있게 보장한다. 그치만, 경합이 발생하는 상황에서는 항상 최신 값을 읽을 수 있음을 보장하지는 않는다.
put()은 동시에 여러 스레드가 데이터를 추가할 수 있도록 하지만 같은 버킷에 대해서는 동기화식으로 처리하는 것이다.
- 해시 값과 버킷 배열 크기를 이용해 어느 슬롯에 데이터를 저장할지 결정한다.
int hash = spread(key.hashCode());
int bin = (n - 1) & hash;
- 버킷이 비어 있으면 CAS 를 사용해 추가
- 만약 버킷이 비어있으면 CAS 를 사용해서 새 노드를 추가한다. 이 과정에서 synchronized 를 사용하지 않기 때문에 동시성이 높아진다. CAS 는 CPU 레벨에서 락을 걸지 않고, 값이 예상한 대로일 때만 업데이트하는 방식으로 동시성을 처리한다.
- casTabAt 은 CAS 연산을 통해, 해당 버킷이 여전히 비어 있는지 확인하고 그 자리에 노드를 추가한다. 이 방식으로 락을 사용하지 않고 동시성을 처리한다.
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
return null; // 정상적으로 데이터가 추가된 경우
}
- 버킷에 이미 데이터가 있으면 동기화
- 해당 버킷에 데이터가 존재한다면, synchronized 블록을 사용해 동기화한다. 이 동기화는 해당 버킷에 대해서만 이루어진다. 다른 버킷에는 영향을 주지 않는다. 즉, 각 버킷에 대해서는 별도로 락을 걸리는 구조가 된다.
synchronized (f) {
// 버킷에 있는 첫 번째 노드에 대해서 락을 걸고 작업 수행
if (tabAt(tab, i) == f) {
if (f.hash == MOVED)
tab = helpTransfer(tab, f);
else {
binCount = 1;
// 새로 추가할 키가 동일한지 확인한 후,
// 연결 리스트나 트리 노드를 통해 값을 저장하거나 덮어쓰기
for (Node<K,V> e = f; ; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
e.val = value;
break;
}
if ((pred = e).next == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
e = e.next;
}
}
}
}
- 트리화
- 버킷에 저장된 노드가 일정 개수(기본적으로 8개) 초과하면, 연결 리스트는 트리 구조로 변환된다. 이렇게 하면 탐색 시간이 O(n) 에서 O(log n)으로 줄어들어 성능이 향상된다.
if (binCount >= TREEIFY_THRESHOLD) {
treeifyBin(tab, i);
}
- 리사이징
- 데이터가 일정 크기가 초과되면, 해시 테이블은 리사이징된다. 이 작업은 테이블 크기를 2배로 늘리고, 기존 데이터를 새로운 배열에 재배치한다. 리사이징 작업은 다른 스레드들과 충돌을 막기 위해 동기화되서 이루어진다.
put 코드
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //해당 인덱스의 버킷이 비어있는지 확인하고 해당 위치에 새 노드를 추가
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //해시 값을 기반으로 배열에서 해당 버킷의 인덱스를 결정한다.
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // successfully added new node
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { //해당 버킷이 이미 노드가 존재하면 그 노드에 대해서 synchronized 블록을 사용
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 기본값 8 이상이면 연결 리스트를 트리 구조로 변환 성능을 올린다.
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//리사이징 : 데이터가 추가되면 아래 메소드를 호출해서 테이블의 크기를 동적으로 조절한다. 임계값에 도달하면 리사이징 진행해 해시테이블의 크기를 늘린다.
addCount(1L, binCount);
return null;
}
ConcurrentHashMap 이 동시성을 처리하는 방법 (CAS, Synchronized 방법 기반)
CAS 란 ? (Compare-And-Swap) 이란?
- 비교 후 교환이라는 원자적 연산을 이용해서 아래 세 개의 값을 비교합니다.
- 현재 값
- 기대하는 값
- 새로운 값
- 현재 값과 기대하는 값이 같으면 새로운 값으로 교체하고, 그렇지 않으면 아무 작업도 하지 않는다. 이 과정은 Atomic 하게 처리한다. 그래서 여러 스레드가 동시에 접근해도 안전하게 값을 업데이트할 수 있다.
ConcurrentHashMap 내부적으로는 Segment 또는 Bucket 을 사용하여 데이터가 저장된다. 각 세그먼트는 자체적으로 동기화되어 있어서 여러 스레드에서 동시에 접근해도 안전하게 데이터를 처리할 수 있다.
ConcurrentHashMap의 CAS 적용방식
- ConcurrentHashMap 은 데이터를 삽입하거나 수정할 때 각 버킷에 대해 락을 거는 대신 CAS 로 변경을 처리한다.
- 키의 값이 있는지 조회
- CAS로 값 업데이트
- 다른 스레드와 충돌 시 다시 시도하거나 백오프 전략을 사용한다.
예 : putIfAbsent() 메서드
메서드의 값이 없을 경우에만 값을 삽입하는 메서드이다.
public V putIfAbsent(K key, V value) {
int hash = hash(key); // 해시 값을 계산
Node<K,V>[] tab; // 내부 노드 배열(버킷)
Node<K,V> first; // 첫 번째 노드 참조
// 해당 버킷에 대해 CAS로 값을 비교하고 업데이트
if ((first = tabAt(tab, hash)) == null) {
if (casTabAt(tab, hash, null, new Node<K,V>(hash, key, value, null))) {
return null; // 값이 없었으므로 삽입 후 null 반환
}
}
// 값이 있으면 기존 값 반환
return first.value;
}
CAS의 장점
- 락을 사용하지 않는다.
- 낮은 경합 하지만, 많은 스레드가 동일한 값을 업데이트하려고 하면 실패율이 높아질 수 있다.
- 다중 스레드 충돌이 생기면 락을 사용해 혼합 기법 적용
ConcurrentHashMap 에서 synchronized 사용
특정 상황에서 synchronized 블록을 적용해 데이터를 보호할 수 있다. 주로, 리사이징, 해시 테이블의 재배치, 복잡한 연산이 필요한 경우 적용된다. 간단히 얘기하면 노드가 추가되거나 삭제하는 것처럼 여러 스레드가 동시에 구조를 변경해야 할 때는 전체적인 데이터 구조를 보호 목적으로 락을 사용한다.
public V put(K key, V value) {
int hash = spread(key.hashCode()); // 해시 값을 계산
Node<K,V>[] tab; // 해시 테이블 참조
Node<K,V> f; // 첫 번째 노드 참조
int n, i;
if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS 방식으로 빈 슬롯에 값을 추가하려 시도
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
return null; // 성공 시 null 반환
}
}
// 복잡한 경우 (해시 충돌 등), synchronized를 사용하여 동시성 제어
synchronized (f) {
if (tabAt(tab, i) == f) {
// 해시 충돌을 처리하는 로직 등
// 노드를 리스트나 트리로 변환하거나 다른 복잡한 구조 변경
// 이 과정은 synchronized 블록 내에서 수행
}
}
return null;
}