범용 논블로킹 락프리 Queue

이 소스코드는 학습 편의성을 위해 간단한 Queue로 구현되어 있다.

간단하다곤 하였지만 지금까지 간단한 예제와는 다르게 다소 복잡하며 왜 Queue를 이렇게 구현하였는지 쉽게 이해가 안되는 부분도 있을것이다. 이유를 알기 위해서는 알고리즘의 본질적인 구조를 이해해야 한다. 이해한다면 거의 모든 자료구조들을 락프리로 구현할 수 있을 것이다.

Position 클래스

class Position
{
public:
    int x;
    int y;

    Position()
        : x(0)
        , y(0)
    {
    }

    void Set(int posX, int posY)
    {
        x = posX;
        y = posY;
    }

    void Get(int* outX, int* outY)
    {
        *outX = x;
        *outY = y;
    }
};

먼저 Queue에 들어갈 데이터 클래스를 살펴보자. 맴버변수로 x와 y값을 가지고 있으며 값에 접근하기 위한 Get메서드와 Set메서드를 제공한다. 조금 특별한 점이라면 Get메서드의 값 반환 방식이 포인터로 전달된다는 점일 것이다.

Invocation과 Response 클래스

enum METHOD
{
    SET,
    GET
};

class Invocation
{
public:
    Position pos;
    METHOD method;
};

class Response
{
public:
    Position pos;
};

두 클래스 모두 Position클래스의 값을 표현할 수 있어야 하므로 Position클래스를 가지고 있다. 여기에 Invocation(호출) 클래스는 어떤 방식으로 동작할지에 대한 상태인 METHOD를 추가로 가지고 있는데 만약 Queue보다 더 다양한 기능을 해야하는 알고리즘이라면 METHOD의 상태가 더 많이 필요할 것이다.

Node 클래스

class Node
{
public:
    Invocation invoc;
    Node* pNext = nullptr;
    int seqNum = 0;

    Node()
        : pNext(nullptr)
        , seqNum(0)
    {
    }

    Node(Invocation inv)
        : invoc(inv)
        , pNext(nullptr)
        , seqNum(0)
    {
    }
};

이 클래스는 외부에서 알고리즘에 어떤 일을 시킬지에 대한 입력을 의미한다. 각각의 스레드는 알고리즘에 적용하고 싶은 일을 Node.Invocation에 담아 요청하게 된다.

SeqObject 클래스

class SeqObject
{
private:
    Position pos;

public:
    Response Apply(Invocation inv)
    {
        Response response;
        switch(inv.method)
        {
        case METHOD::SET:
            pos.x = inv.pos.x;
            pos.y = inv.pos.y;
            break;

        case METHOD::GET:
            response.pos.x = pos.x;
            response.pos.y = pos.y;
            break;

        default:
            cout << "ERROR!\n";
            exit(-1);
            break;
        }
        return response;
    }
};

SeqObject(순서)클래스는 내부에 Position클래스를 가지고 있으며 Apply시 들어오는 Invocation의 내용에 맞게 자신의 값을 Set 또는 Get 한다. 결과값은 Response클래스의 형태로 반환된다.

UniversalLFA 클래스

이번 장의 핵심인 UniversalLFA (Universal Lock-Free Algorithm 만능 락프리 알고리즘) 클래스이다. 코드가 다소 길고 복잡한 동작을 담고 있으므로 끊어서 살펴보도록 하자.

UniversalLFA 클래스 - 맴버변수, 생성자

private:
    Node* nodeArray[MAX_THREADS];
    Node* pTail;

private:
    void NodeArrayInit()
    {
        pTail = new Node();
        pTail->seqNum = 1;
        for(int i = 0; i < MAX_THREADS; ++i)
        {
            nodeArray[i] = pTail;
        }
    }

public:
    UniversalLFA()
    {
        NodeArrayInit();
    }

우선 스래드의 수 만큼의 길이를 가지는 nodeArray를 보자. 각각의 스래드는 요청을 하고 그 요청에 대한 결과를 받아가는 일을 하게 된다. 특정한 경우 모든 스레드가 동시에 요청할 수도 있을 것이며 nodeArray배열의 길이는 이를 위한 공간이다. pTailNodeArrayInit()은 pTail의 값을 1로 설정하고 nodeArray의 모든 원소를 pTail로 지정하는 일을 한다. 기본 생성자에서는 NodeArrayInit()만을 처리하여 자료구조를 초기화 한다.

UniversalLFA 클래스 - 초기화

private:
    void Init()
    {
        Node* pLastNode = GetLastNode();
        while(pLastNode != nullptr)
        {
            Node* temp = pLastNode;
            pLastNode = pLastNode->pNext;
            delete temp;
        }
        NodeArrayInit();
    }

private:
    Node* GetMaxNode()
    {
        int maxSeqNum = nodeArray[0]->seqNum;
        int maxSeqIndex = 0;
        for(int i = 1; i < MAX_THREADS; ++i)
        {
            if(nodeArray[i]->seqNum > maxSeqNum)
            {
                maxSeqNum = nodeArray[i]->seqNum;
                maxSeqIndex = i;
            }
        }
        return nodeArray[maxSeqIndex];
    }

이 테스트의 main()에서는 기존의 예제들과 같이 스래드의 수를 점차 증가시키며 테스트를 반복한다. 이때 자료구조의 재사용을 위한 Init()메서드이다. 이 메서드에서는 seqNum이 가장 큰 노드부터 nullptr이 나타날때까지 노드들을 순회하면서 delete한 뒤 NodeArrayInit()을 호출하고 있다.

UniversalLFA 클래스 - CAS_and_GET 메서드

private:
    Node* CAS_and_GET(Node** addr, Node* oldNode, Node* newNode)
    {
        atomic_compare_exchange_strong(
            reinterpret_cast<atomic_int*>(addr),
            reinterpret_cast<int*>(&oldNode),
            reinterpret_cast<int>(newNode)
            );
        return *addr;
    }

논 블로킹 알고리즘을 구현하려면 CAS연산이 필요하다. UniversalLFA에서는 CAS연산을 위와같은 방식으로 사용하고 있다. addr의 주소에 있는 내용이 oldNode의 것과 같다면 newNode로 교체하고 변경된 주소값을 반환하는데 만약 같지 않다면 값이 변경되지 않은 addr의 주소가 반환되게 된다.

UniversalLFA 클래스 - Apply 메서드

public:
    Response Apply(Invocation invoc)
    {
        int id = ThreadID;
        Node* pPrefer = new Node(invoc);
        while(pPrefer->seqNum == 0)
        {
            Node* pBefore = GetLastNode();
            Node* pAfter = CAS_and_GET(&(pBefore->pNext), nullptr, pPrefer);
            pBefore->pNext = pAfter;
            pAfter->seqNum = pBefore->seqNum + 1;
            nodeArray[id] = pAfter;
        }
        SeqObject myObject;
        Node* pCurrent = pTail->pNext;
        while(pCurrent != pPrefer)
        {
            myObject.Apply(pCurrent->invoc);
            pCurrent = pCurrent->pNext;
        }
        return myObject.Apply(pCurrent->invoc);
    }

특정 스레드는 Apply메서드를 이용해 자료구조에 Invocation(요청)할 수 있다. 내부에서는 전달받은 invoc로 Node를 생성하게 되는데 처음에는 seqNum이 0이므로 무조건 while문으로 진입한다. seqNum이 가장 큰 마지막 노드를 얻어오고 그 다음 노드와 nullptr로 CAS_and_GET()을 호출한다.

CAS_and_GET()이 성공한 경우

pBefore->pNext가 invoc로부터 생성된 pPrefer로 설정되며, 반환된 pAfterpPrefer는 결과적으로 같은 주소를 가리키게 된다. pAfter의 seqNum을 1늘려서 설정하고 해당 스레드의 nodeArray[id]는 처리가 성공한 pAfter를 가리키게 된다.

CAS_and_GET()이 실패한 경우

CAS_and_GET()이 실패한다면 반환되어 돌아온 pAfter는 마지막노드의 pNext인 체로 변하지 않았다. 따라서 pPrefer->seqNum은 바뀌지 않은체로 0이며 whlie문을 탈출하지 못하고 루프하게된다.

CAS_and_GET()이 실패하는 원인

이 스레드가 자신의 invoc를 처리하는 도중 다른 스레드가 먼저 자신의 invoc를 처리하는 바람에 pBefore->pNext가 바뀌게 되었고, 뒤늦게 이 스레드에서 CAS연산을 수행했을 때 pBefore->pNext가 nullptr이 아니게 되었기 때문이다.

myObject.Apply()

CAS_and_GET()의 호출이 성공하고 while문에서 탈출한 뒤부터가 이 알고리즘의 핵심이라 할 수 있다. 코드만 봤을때 이러한 구현방식은 얼핏 이해하기 어렵다. 그러나 그 의미를 해석하면 다음과 같다.

  1. CAS_and_GET() 성공
  2. 지금까지 모든 스레드들이 요청했던 node들을 순차적으로 로컬 자료구조 myObject에 적용
  3. 최종적으로 자신의 스레드에서 작업완료한 node를 myObject에 적용하고 반환

따라서 이 알고리즘은 어떤 스레드가 자신의 invoc를 처리하려 할 때마다 자신의 로컬 SeqObject에 지금까지 있었던 모든 변경내용을 다 적용하고나서 자신의 처리내용을 덧붙인다.

이렇게 다른 스레드의 작업을 기다리지 않는 논 블로킹 알고리즘을 구현하였다. 실제로 소스코드 어디에도 Lock을 사용하지 않았으며, 여러 스레드에서 수행중인 메서드들 중 적어도 한 개의 메서드가 유한한 단계에 실행을 끝마치게 되는 락프리알고리즘이 되었다.

ThreadFunc()와 main()

const int TEST_COUNT = 50000;
int gNumThreads = 0;
UniversalLFA g_lfQueue;

void ThreadFunc(int myThreadID)
{
    cout << "ThreadID: " << myThreadID << endl;

    Invocation invoc;
    Response res;

    for(int i = 0; i < TEST_COUNT / gNumThreads; ++i)
    {
        switch(rand() % 2)
        {
        case 0:
            invoc.pos.Set(i, i);
            invoc.method = METHOD::SET;
            res = g_lfQueue.Apply(invoc);
            break;

        case 1:
            invoc.method = METHOD::GET;
            res = g_lfQueue.Apply(invoc);
            break;

        default:
            cout << "ERROR\n";
            exit(-1);
        }
    }
}

void main()
{
    vector<thread*> threadVector;
    for(int i = 1; i <= MAX_THREADS; i *= 2)
    {
        gNumThreads = i;
        g_lfQueue.Init();
        threadVector.clear();

        auto startTime = high_resolution_clock::now();
        for(int j = 0; j < i; ++j)
        {
            threadVector.push_back(new thread{ThreadFunc, j});
        }

        for(auto t : threadVector)
        {
            t->join();
        }
        auto durationTime = high_resolution_clock::now() - startTime;
        cout << "스레드 수: " << i << ", 걸린 시간: " << duration_cast<milliseconds>(durationTime).count() << endl;
    }
    getchar();
}

스레드 메서드와 main()의 동작은 지금까지와 유사하다. 스레드 메서드에는 현재 작업을 처리중인 ThreadID를 출력하며, TEST_COUNT / 스레드 수만큼 랜덤하게 GET이나 SET을 하는 invoc를 자료구조에 요청하는 일을 하며 main()에서는 스레드 수를 증가시켜가며 테스트를 진행하게 된다.

실행 결과

스레드 수 LF Queue(ms)
1 5535
2 3200
4 1726
8 1493
16 1163
32 1087
64 1112

싱글 스레드에서 5초가 걸렸던 작업이 스레드 수를 증가시켜감에 따라 점차 줄어들어 최종적으로는 1초남짓밖에 걸리지 않았다. 64스레드부터 오히려 처리속도가 오래걸리는것은 스레드간의 경합에 의한 과부하일 것이다. 그러나 겨우 50,000번의 큐 작업을 하는데 저정도의 시간이 필요하다는것은 다소 의문이 든다.