논 블로킹 알고리즘 Non-Blocking Algorithm

논 블로킹 알고리즘이란 자료구조에 접근하는 것에 원자성을 보장하며 상대 스레드에 의존적이지 않는 구현방식을 가진 알고리즘을 말한다. 다른 스레드에 의존적이지 않다는 것은 결과적으로 Lock을 걸 필요가 없다는 것이다. 만약 자료구조들을 논 블로킹 알고리즘으로 구현한다면 Lock에 의한 오버해드도 없으면서 그와 동시에 Lock사용으로 인한 새로운 문제들에 대해서도 자유로운 효율적인 멀티스레드 자료구조가 될 것이다.

논 블로킹 알고리즘의 등급

일반적으로 알고리즘의 등급은 동작 방식에 따라 수학적으로 증명할 수 있는 Big O 표기법으로 나타낸다. 이와 비슷하게 논 블로킹 알고리즘은 동작 방식에 따라 다음과 같이 분류된다.

알고리즘의 등급 O(1) > O(n) > O(n log n ) > O(N^2)

무대기 등급 Wait-Freedom

  • 여러 스레드에서 수행중인 모든 메서드가 정해진 유한한 단계에 실행을 끝마치게 되는 구현방식

무잠금 등급 Lock-Freedom

  • 여러 스레드에서 수행중인 메서드들 중 적어도 한 개의 메서드가 유한한 단계에 실행을 끝마치게 되는 구현방식
  • 때때로 기아 현상이 유발될 수 있다.
  • 무 대기등급에 비해 오버해드가 적고, 쉽게 구현할 수 있기 때문에 성능을 위해 무 잠금등급으로 구현하는 경우가 많다.

이후 자주 언급될 것이기 때문에 편의상 락프리라고 지칭하겠다.

무간섭 Obstruction-Freedom

  • 한 개를 제외한 모든 스레드가 멈췄을 때, 멈추지 않ㅇ른 스레드의 메서드가 유한한 단계에 끝마치게 되는 구현방식
  • 충돌중인 스레드를 잠깐 멈춰야 할 필요가 있다.

많은 사람들이 Lock-free로 부르는 무 잠금알고리즘은 논 블로킹 알고리즘의 일종이다.

CAS - Compare And Swap

기존의 CPU연산인 Read와 Store만으로는 기존의 알고리즘을 논 블로킹 알고리즘으로 바꿀 수 없다.

Read와 Store만으로 기존의 알고리즘을 논 블로킹 알고리즘으로 바꿀 수 없다는것은 이미 증명되어 있다. 그러나 증명과정에 대해 서술하는것은 이 책의 목적과 맞지 않는다고 생각하여 생략한다.

그러나 Compare And Swap 또는 Compare And Set으로 불리는 CAS연산이 제공된다면 논 블로킹 알고리즘의 구현이 가능해진다. CAS는 기본적으로 메모리의 값이 기대값과 같다면 업데이트 후 true를 반환하고, 기대값과 다르다면 false를 반환하는 연산이다. 이 과정 또한 당연히 원자적이며 무대기로 동작한다.

Windows의 CAS연산

Windows에서는 CAS연산을 InterlockedCompareExchange라는 API로 제공한다.

LONG __cdecl InterlockedCompareExchange (
    _Inout_ LONG volatile *Destination,
    _In_ LONG Exchange,
    _In_ LONG Comparand
);
  • return: 대상 위치의 초기값 (일반적인 CAS와는 다소 다른 부분)
  • *Destination: 대상 위치
  • Exchange: 교체될 값
  • Comparand: 비교할 값

뿐만 아니라 Windows에서는 Interlocked...로 시작하는 다양한 CAS연산을 제공한다.

리눅스의 CAS연산

리눅스에서 CAS연산은 다음과 같이 구현될 수 있다. 스텍오버플로우 참조

#include <stdbool.h>
bool CAS(int* ptr, int oldval, int newval)
{
    return __sync_bool_compare_and_swap(ptr, oldval, newval);
}
  • ptr의 주소에 있는 값이
  • oldval과 같다면
  • newval로 교체하고 true를 반환
  • 다르다면 false를 반환한다.

C++11의 CAS 연산

스래드의 활용 및 동기화에 대한 주제가 대거 추가된 C++11에서도 CAS연산을 정식으로 지원한다. compare_exchange_strong

bool compare_exchange_strong (
    T& tst_val,
    T& new_val,
);

std::atomic::compare_exchange_strong 메서드는 std::atomic<T>로 선언된 변수의 멤버 메서드이다. 사용법이 다소 달라보일 수 있으나 본질적으로 CAS와 같다.

C++11은 CAS를 사용하기 위해 std::atomic를 사용하도록 권장하고 있다. 그러나 volatile로 선언된 기존의 변수들도 캐스팅을 통해 위 메서드를 사용할 수 있다.

  • std::atomic의 값이
  • tst_val과 같다면
  • new_val로 교체하고 true를 반환
  • 다르다면 false를 반환한다.

범용 논블로킹 락프리 Queue

전체 소스코드

#include <iostream>
#include <chrono>
#include <vector>
#include <thread>
#include <atomic>
using namespace std;
using namespace chrono;

class Position
{
public:
    int x;
    int y;

    Position()
        : x(0)
        , y(0)
    {
    }

    void Set(int posX, int posY)
    {
        x = posX;
        y = posY;
    }

    void Get(int* outX, int* outY)
    {
        *outX = x;
        *outY = y;
    }
};


enum METHOD
{
    SET,
    GET
};

class Invocation
{
public:
    Position pos;
    METHOD method;
};

class Response
{
public:
    Position pos;
};


class Node
{
public:
    Invocation invoc;
    Node* pNext = nullptr;
    int seqNum = 0;

    Node()
        : pNext(nullptr)
        , seqNum(0)
    {
    }

    Node(Invocation inv)
        : invoc(inv)
        , pNext(nullptr)
        , seqNum(0)
    {
    }
};

class SeqObject
{
private:
    Position pos;

public:
    Response Apply(Invocation inv)
    {
        Response response;
        switch(inv.method)
        {
        case METHOD::SET:
            pos.x = inv.pos.x;
            pos.y = inv.pos.y;
            break;

        case METHOD::GET:
            response.pos.x = pos.x;
            response.pos.y = pos.y;
            break;

        default:
            cout << "ERROR!\n";
            exit(-1);
            break;
        }
        return response;
    }
};


const int MAX_THREADS = 64;
_declspec(thread) int ThreadID;

class UniversalLFA
{
private:
    Node* nodeArray[MAX_THREADS];
    Node* pTail;

    void NodeArrayInit()
    {
        pTail = new Node();
        pTail->seqNum = 1;
        for(int i = 0; i < MAX_THREADS; ++i)
        {
            nodeArray[i] = pTail;
        }
    }

    Node* GetLastNode()
    {
        int lastSeqNum = nodeArray[0]->seqNum;
        int retNodeIndex = 0;
        for(int i = 1; i < MAX_THREADS; ++i)
        {
            if(nodeArray[i]->seqNum > lastSeqNum)
            {
                lastSeqNum = nodeArray[i]->seqNum;
                retNodeIndex = i;
            }
        }
        return nodeArray[retNodeIndex];
    }

    Node* CAS_and_GET(Node** addr, Node* oldNode, Node* newNode)
    {
        atomic_compare_exchange_strong(
            reinterpret_cast<atomic_int*>(addr),
            reinterpret_cast<int*>(&oldNode),
            reinterpret_cast<int>(newNode)
            );
        return *addr;
    }

public:
    UniversalLFA()
    {
        NodeArrayInit();
    }

    void Init()
    {
        Node* pLastNode = GetLastNode();
        while(pLastNode != nullptr)
        {
            Node* temp = pLastNode;
            pLastNode = pLastNode->pNext;
            delete temp;
        }
        NodeArrayInit();
    }

    Response Apply(Invocation invoc)
    {
        int id = ThreadID;
        Node* pPrefer = new Node(invoc);
        while(pPrefer->seqNum == 0)
        {
            Node* pBefore = GetLastNode();
            Node* pAfter = CAS_and_GET(&(pBefore->pNext), nullptr, pPrefer);
            pBefore->pNext = pAfter;
            pAfter->seqNum = pBefore->seqNum + 1;
            nodeArray[id] = pAfter;
        }
        SeqObject myObject;
        Node* pCurrent = pTail->pNext;
        while(pCurrent != pPrefer)
        {
            myObject.Apply(pCurrent->invoc);
            pCurrent = pCurrent->pNext;
        }
        return myObject.Apply(pCurrent->invoc);
    }
};


const int TEST_COUNT = 50000;
int gNumThreads = 0;
UniversalLFA g_lfQueue;

void ThreadFunc(int myThreadID)
{
    cout << "ThreadID: " << myThreadID << endl;

    Invocation invoc;
    Response res;

    for(int i = 0; i < TEST_COUNT / gNumThreads; ++i)
    {
        switch(rand() % 2)
        {
        case 0:
            invoc.pos.Set(i, i);
            invoc.method = METHOD::SET;
            res = g_lfQueue.Apply(invoc);
            break;

        case 1:
            invoc.method = METHOD::GET;
            res = g_lfQueue.Apply(invoc);
            break;

        default:
            cout << "ERROR\n";
            exit(-1);
        }
    }
}


void main()
{
    vector<thread*> threadVector;
    for(int i = 1; i <= MAX_THREADS; i *= 2)
    {
        gNumThreads = i;
        g_lfQueue.Init();
        threadVector.clear();

        auto startTime = high_resolution_clock::now();
        for(int j = 0; j < i; ++j)
        {
            threadVector.push_back(new thread{ThreadFunc, j});
        }

        for(auto t : threadVector)
        {
            t->join();
        }
        auto durationTime = high_resolution_clock::now() - startTime;
        cout << "스레드 수: " << i << ", 걸린 시간: " << duration_cast<milliseconds>(durationTime).count() << endl;
    }
    getchar();
}

지금까지의 간단한 예제와 다르게 전체 소스코드가 제법 길고 여러 클래스로 구성되어 있다. 코드의 자세한 분석 및 문제점 진단은 다음 장에서 계속하도록 한다.

조금 더 알아보기 - _declspec

이 키워드는 다음에 선언되는 것의 저장소 정보를 정의할 수 있게 해주는 키워드이다. 여기서는 이해를 돕기 위해 자주 사용되는 몇가지를 살펴본다. 이 키워드의 더 자세한 정보는 MSDN __declspec에서 확인할 수 있다.

__declspec(align(#))

이 키워드는 뒤에 선언된 데이터타입의 메모리 정렬을 지정한다.

#include <iostream>
using namespace std;

_declspec(align(32)) struct str1{ int a,b,c,d,e; }
struct str2{ int a,b,c,d,e; }

void main()
{
    cout << "str1: " << sizeof(str1) << endl;
    cout << "str2: " << sizeof(str2) << endl;
}

결과는 str1의 크기는 32로 맞춰지며 str2는 int가 5개이므로 20으로 출력된다. 만약 align(#)의 1부터 8192까지 가능하며, 2의 배수가 아니라면 컴파일시 에러가 난다. 더욱 자세한 정보는 MSDN align에서 확인할 수 있다.

declspec(dllimport), declspec(dllexport)

DLL 개발을 해본적이 있다면 한번쯤 봤을 키워드이다. 보통은 이 키워드를 직접 사용하기보다 define하여 사용한다.

#define DllImport   __declspec( dllimport )
#define DllExport   __declspec( dllexport )

DllExport void func();
DllExport int i = 10;
DllImport int j;
DllExport int n;

DLL 프로젝트 개발에 대한 내용은 이 책의 범위를 넘어서는 것이므로 생략한다. 더욱 자세한 정보는 MSDN dllimport, dllexport에서 확인할 수 있다.

__declspec(thread)

이 키워드는 뒤에 선언된 데이터타입을 스래드 로컬 저장소로 지정한다. 이번 예제에서 쓰인것으로 TLS(Thread Local Storage)라고 불리기도 한다. 위 그림에서 Thread1과 Thread2모두 각각의 TLS를 가지고 있는것을 확인할 수 있다. 사용에 대한 내용은 MSDN thread에서 확인할 수 있으며 TLS에 대한 자세한 정보는 MSDN TLS에서 볼 수 있다.