STL Queue + mutex

범용 논블로킹 락프리 Queue와 비교하기 위해 SQL Queue에 mutex를 사용한 프로그램을 하나 더 만들기로 한다.

전체 소스코드

#include <iostream>
#include <chrono>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
using namespace std;
using namespace chrono;

class Position
{
public:
    int x;
    int y;

    Position()
        : x(0)
        , y(0)
    {
    }

    void Set(int posX, int posY)
    {
        x = posX;
        y = posY;
    }

    void Get(int* outX, int* outY)
    {
        *outX = x;
        *outY = y;
    }
};


queue<Position> g_stlQueue;

void QueueInit()
{
    while(!g_stlQueue.empty())
        g_stlQueue.pop();
}


mutex g_lock;
const int TEST_COUNT = 50000;
int gNumThreads = 0;

void ThreadFunc(int myThreadID)
{
    cout << "ThreadID: " << myThreadID << endl;

    Position setPos;
    Position getPos;

    for(int i = 0; i < TEST_COUNT / gNumThreads; ++i)
    {
        g_lock.lock();
        switch(rand() % 2)
        {
        case 0:
            setPos.Set(i, i);
            g_stlQueue.push(setPos);
            break;

        case 1:
            if(!g_stlQueue.empty())
            {
                getPos = g_stlQueue.front();
                g_stlQueue.pop();
            }
            break;

        default:
            cout << "ERROR\n";
            exit(-1);
        }
        g_lock.unlock();
    }
}


const int MAX_THREADS = 64;
_declspec(thread) int ThreadID;

void main()
{
    vector<thread*> threadVector;
    for(int i = 1; i <= MAX_THREADS; i *= 2)
    {
        gNumThreads = i;
        QueueInit();
        threadVector.clear();

        auto startTime = high_resolution_clock::now();
        for(int j = 0; j < i; ++j)
        {
            threadVector.push_back(new thread{ThreadFunc, j});
        }

        for(auto t : threadVector)
        {
            t->join();
        }
        auto durationTime = high_resolution_clock::now() - startTime;
        cout << "스레드 수: " << i << ", 걸린 시간: " << duration_cast<milliseconds>(durationTime).count() << endl;
    }
    getchar();
}

동작 자체는 기존의 예제와 동일하게 스레드 수를 늘려가면서 Queue에 값을 Get, Set 하는 예제이다. 자료구조 코드는 STL Queue사용하기에 줄어들었으며, 대신 C++11 mutex를 사용하기 위한 코드가 추가되었다.

실행 결과

스레드 수 LF Queue(ms) mutex Queue(ms)
1 5516 5
2 3180 12
4 1885 10
8 1167 64
16 1298 10
32 1239 30
64 1094 52

우리가 갖은 문제발생의 원인이자 엄청난 오버해드의 장본인인 것처럼 매도했던 Lock쪽이 락프리 알고리즘보다 훨씬 성능이 좋은것을 확인할 수 있다. 내친김에 Windwos에서 제공하는 임계영역 (Critical Section)도 테스트 해 보자.

STL Queue + 임계 영역(Critical Section)

전체 소스코드

#include <iostream>
#include <chrono>
#include <vector>
#include <thread>
#include <queue>
#include <Windows.h>
using namespace std;
using namespace chrono;

class Position
{
public:
    int x;
    int y;

    Position()
        : x(0)
        , y(0)
    {
    }

    void Set(int posX, int posY)
    {
        x = posX;
        y = posY;
    }

    void Get(int* outX, int* outY)
    {
        *outX = x;
        *outY = y;
    }
};


CRITICAL_SECTION g_cs;

void InitCS()
{
    InitializeCriticalSection(&g_cs);
}


queue<Position> g_stlQueue;

void QueueInit()
{
    while(!g_stlQueue.empty())
        g_stlQueue.pop();
}


const int TEST_COUNT = 50000;
int gNumThreads = 0;

void ThreadFunc(int myThreadID)
{
    cout << "ThreadID: " << myThreadID << endl;

    Position setPos;
    Position getPos;

    for(int i = 0; i < TEST_COUNT / gNumThreads; ++i)
    {
        EnterCriticalSection(&g_cs);
        switch(rand() % 2)
        {
        case 0:
            setPos.Set(i, i);
            g_stlQueue.push(setPos);
            break;

        case 1:
            if(!g_stlQueue.empty())
            {
                getPos = g_stlQueue.front();
                g_stlQueue.pop();
            }
            break;

        default:
            cout << "ERROR\n";
            exit(-1);
        }
        LeaveCriticalSection(&g_cs);
    }
}


const int MAX_THREADS = 64;
_declspec(thread) int ThreadID;

void main()
{
    InitCS();

    vector<thread*> threadVector;
    for(int i = 1; i <= MAX_THREADS; i *= 2)
    {
        gNumThreads = i;
        QueueInit();
        threadVector.clear();

        auto startTime = high_resolution_clock::now();
        for(int j = 0; j < i; ++j)
        {
            threadVector.push_back(new thread{ThreadFunc, j});
        }

        for(auto t : threadVector)
        {
            t->join();
        }
        auto durationTime = high_resolution_clock::now() - startTime;
        cout << "스레드 수: " << i << ", 걸린 시간: " << duration_cast<milliseconds>(durationTime).count() << endl;
    }

    getchar();
}

위의 예제에서 mutex관련 부분을 제거하고 대신 임계영역을 사용하기 위한 코드를 추가하였다.

실행 결과

스레드 수 LF Queue(ms) mutex Queue(ms) CS Queue(ms)
1 5516 5 4
2 3180 12 3
4 1885 10 3
8 1167 64 9
16 1298 10 10
32 1239 30 23
64 1094 52 45

임계 영역을 사용했을때 결과는 Lock을 사용했을때 보다 아주 근소하게 더 나았다. 그렇다면 락프리 알고리즘보다 Lock을 거는 방식이 더욱 안전하고 좋다는 것일까? 그렇지 않다. 우리가 멀티스레드 프로그래밍을 하는 이유는 멀티스레드로 하여금 성능상 이득을 보기 위해서인데 위 결과에서 스레드 수가 증가함에 따라 좋은 성능을 보여주는 프로그램은 락프리 알고리즘 뿐이다.

그렇다면 락프리 Queue는 어떤점이 문제였을까? 바로 범용성이다. 범용적이면서 락프리등급을 유지하기 위해서 이 알고리즘은 자료구조에 변화가 있을 때마다 스레드 지역변수에서 지금까지 자료구조에 적용했던 히스토리를 다시 적용한다. 이러한 행동은 CAS연산에 의해 다른 스레드와 독립적으로 처리될 수 있지만 Lock보다 심한 오버해드를 발생시키는 원인이 된다.

마무리

여기까지 와서 싱글스레드 프로그래밍으로 돌아갈 수는 없다. 그렇다고 블로킹 알고리즘으로 개발하는것은 멀티스레드 프로그램의 목적인 스레드 증가에 따른 성능 향상을 포기하는 처사이다. 따라서 우리는 락프리 알고리즘을 제대로 구현해서 효율적인 멀티스레드 프로그래밍을 할 수 있는 방법을 찾아야 한다.

조금 더 알아보기 - 멀티스레드 프로그램에서 그냥 STL Queue 사용

위 예제들에서 lock부분을 주석하여 실행하면 멀티스레드 상황에서 STL 컨테이너를 그냥 사용했을때 발생할 수 있는 문제를 체험할 수 있다.

여기서 더욱 골치아픈 점은 뒤에 살짝 보이는 진행 정도가 매번 다르다는 것이며, 때때로 프로그램이 정상 수행되기도 한다는 점이다. 무조건 발생하는 버그는 재현도 쉽고 고치기뒤 쉽다. 하지만 멀티스레드 프로그램에서 발생할 수 있는 버그는 간혈적으로 발생한다는 점에서 프로그래머를 힘들게 한다.

조금 더 알아보기 - 뮤텍스와 임계 영역의 차이

앞선 예제에서 뮤텍스보다 임계 영역이 근소하게 성능이 좋은것을 확인할 수 있었다. 이는 멀티스레드 프로그램에서 발생되는 불확실성에 대한 우연의 결과일까? 그 차이점을 알아보자.

먼저 임계 영역은 프로그램 코드이다. 스레드가 임계 영역 안으로 진입할때는 단순히 lock-count의 증가 혹은 감소를 수행하며 문맥전환은 일어나지 않는다. 따라서 임계영역은 가볍고 빠르고 쓰기 쉽다.

반면 뮤텍스는 임계 영역에 비해 느리고 무겁지만 명명될 수 있으며, 프로세스가 서로 달라도 사용될 수 있다.

출처: Windows CS 팀블로그