您现在的位置是:网站首页>技术百科技术百科

无锁有序链表的实现

小大寒2024-01-01[技术百科]博学多闻

无锁有序链表的实现无锁有序链表可以保证元素的唯一性,使其可用于哈希表的桶,甚至直接作为一个效率不那么高的map。普通链表的无锁实现相对简单点,因为插入元素可以在表头插,而有序链表的插入则是任意位置。通过合理使用CAS和mark标志位,结合hazard pointer,我们能够在并发环境下安全地操作无锁有序链表。其最关键的部分是准确地判断和处理内存是否被释放、标记等问题,避免在多个线程中出现内存泄漏或访问无效内存的情况。 本文主要基于论文 High Performance Dynamic Lock-Free Hash Tables 实现。

无锁有序链表的实现

无锁有序链表可以保证元素的唯一性,使其可用于哈希表的桶,甚至直接作为一个效率不那么高的map。普通链表的无锁实现相对简单点,因为插入元素可以在表头插,而有序链表的插入则是任意位置。

主要问题

链表的主要操作包含insertremove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例:

        struct node_t {
            key_t key;
            value_t val;
            node_t *next;
        };
        
        int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
            node_t *pred = head;
            node_t *item = head->next;
            while (item) {
                int d = KEY_CMP(item->key, key);
                if (d >= 0) {
                    *pred_ptr = pred;
                    *item_ptr = item;
                    return d == 0 ? TRUE : FALSE;
                }
                pred = item;
                item = item->next;
            }
            *pred_ptr = pred;
            *item_ptr = NULL;
            return FALSE;
        }
        
        int l_insert(node_t *head, key_t key, value_t val) {
            node_t *pred, *item, *new_item;
            while (TRUE) {
                if (l_find(&pred, &item, head, key)) {
                    return FALSE;
                }
                new_item = (node_t*) malloc(sizeof(node_t));
                new_item->key = key;
                new_item->val = val;
                new_item->next = item;
                // A. 如果pred本身被移除了
                if (CAS(&pred->next, item, new_item)) {
                    return TRUE;
                }
                free(new_item);
            }
        }
        
        int l_remove(node_t *head, key_t key) {
            node_t *pred, *item;
            while (TRUE) {
                if (!l_find(&pred, &item, head, key)) {
                    return TRUE;
                }
                // B. 如果pred被移除;如果item也被移除
                if (CAS(&pred->next, item, item->next)) {
                    haz_free(item);
                    return TRUE;
                }
            }
        }
        

l_find函数返回查找到的前序元素和元素本身,代码A和B虽然拿到了preditem,但在CAS的时候,其可能被其他线程移除。甚至,在l_find过程中,其每一个元素都可能被移除。问题在于,任何时候拿到一个元素时,都不确定其是否还有效。元素的有效性包括其是否还在链表中,其指向的内存是否还有效。

我认为CAS有必要详细讲解一下,为了不扰乱主流程,放到文末完整代码后面。

解决方案

通过为元素指针增加一个有效性标志位,配合CAS操作的互斥性,就可以解决元素有效性判定问题。

因为node_t放在内存中是会对齐的,所以指向node_t的指针值低几位是不会用到的,从而可以在低几位里设置标志,这样在做CAS的时候,就实现了DCAS的效果,相当于将两个逻辑上的操作变成了一个原子操作。想象下引用计数对象的线程安全性,其内包装的指针是线程安全的,但对象本身不是。

CAS的互斥性,在若干个线程CAS相同的对象时,只有一个线程会成功,失败的线程就可以以此判定目标对象发生了变更。改进后的代码(代码仅做示例用,不保证正确):

        typedef size_t markable_t;
        // 最低位置1,表示元素被删除
        #define HAS_MARK(p) ((markable_t)p & 0x01)
        #define MARK(p) ((markable_t)p | 0x01)
        #define STRIP_MARK(p) ((markable_t)p & ~0x01)
        
        int l_insert(node_t *head, key_t key, value_t val) {
            node_t *pred, *item, *new_item;
            while (TRUE) {
                if (l_find(&pred, &item, head, key)) { 
                    return FALSE;
                }
                new_item = (node_t*) malloc(sizeof(node_t));
                new_item->key = key;
                new_item->val = val;
                new_item->next = item;
                // A. 虽然find拿到了合法的pred,但是在以下代码之前pred可能被删除,此时pred->next被标记
                //    pred->next != item,该CAS会失败,失败后重试
                if (CAS(&pred->next, item, new_item)) {
                    return TRUE;
                }
                free(new_item);
            }
            return FALSE;
        }
        
        int l_remove(node_t *head, key_t key) {
            node_t *pred, *item;
            while (TRUE) {
                if (!l_find(&pred, &item, head, key)) {
                    return FALSE;
                }
                node_t *inext = item->next;
                // B. 删除item前先标记item->next,如果CAS失败,那么情况同insert一样,有其他线程在find之后
                //    删除了item,失败后重试
                if (!CAS(&item->next, inext, MARK(inext))) {
                    continue;
                }
                // C. 对同一个元素item删除时,只会有一个线程成功走到这里
                if (CAS(&pred->next, item, STRIP_MARK(item->next))) {
                    haz_defer_free(item);
                    return TRUE;
                }
            }
            return FALSE;
        }
        
        int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
            node_t *pred = head;
            node_t *item = head->next;
            hazard_t *hp1 = haz_get(0);
            hazard_t *hp2 = haz_get(1);
            while (item) {
                haz_set_ptr(hp1, pred);
                haz_set_ptr(hp2, item);
                /* 
                 如果已被标记,那么紧接着item可能被移除链表甚至释放,所以需要重头查找
                */
                if (HAS_MARK(item->next)) { 
                    return l_find(pred_ptr, item_ptr, head, key);
                }
                int d = KEY_CMP(item->key, key);
                if (d >= 0) {
                    *pred_ptr = pred;
                    *item_ptr = item;
                    return d == 0 ? TRUE : FALSE;
                }
                pred = item;
                item = item->next;
            }
            *pred_ptr = pred;
            *item_ptr = NULL;
            return FALSE;
        }
        

haz_gethaz_set_ptr之类的函数是一个hazard pointer实现,用于支持多线程下内存的GC。上面的代码中,要删除一个元素item时,会标记item->next,从而使得insert时中那个CAS不需要做任何调整。总结下这里的线程竞争情况:

  • insertfind到正常的preditempred->next == item,然后在CAS前有线程删除了pred,此时pred->next == MARK(item)CAS失败,重试;删除分为2种情况:a) 从链表移除,得到标记,pred可继续访问;b)pred可能被释放内存,此时再使用pred会错误。为了处理情况b,所以引入了类似hazard pointer的机制,可以有效保障任意一个指针p只要还有线程在使用它,它的内存就不会被真正释放
  • insert中有多个线程在pred后插入元素,此时同样由insert中的CAS保证,这个不多说
  • remove中情况同insertfind拿到了有效的prednext,但在CAS的时候pred被其他线程删除了,CAS失败

总结

通过合理使用CASmark标志位,结合hazard pointer,我们能够在并发环境下安全地操作无锁有序链表。其最关键的部分是准确地判断和处理内存是否被释放、标记等问题,避免在多个线程中出现内存泄漏或访问无效内存的情况。

代码展示

Makefile

common_defs.h

hazard_pointers.h

hazard_pointers.c

lock_free_list.h

lock_free_list.c

hazard_test.c

我知道看到这里你已经很头大了,那就再加把劲儿,把CAS原理也啃下来,这块很重要。

CAS(Compare-And-Swap) 详解

CAS(Compare-And-Swap,即比较并交换)是一种用于实现多线程并发控制的原子操作。它允许线程安全地更新某个变量的值,而无需使用锁。CAS 是现代多核处理器和并发编程中一种非常重要的机制。

工作原理

CAS 操作需要三个参数:

  1. 变量的内存地址(`ptr`):需要被更新的变量。
  2. 旧值(`expected`):当前线程期望该变量的值。
  3. 新值(`desired`):当前线程希望更新的值。

CAS 的执行逻辑:

  1. 读取 `ptr` 的当前值。
  2. 将当前值与 `expected`(旧值)进行比较:
    • 如果相等,表示变量没有被其他线程修改过,将 `ptr` 的值更新为 `desired`(新值)。
    • 如果不相等,表示变量已经被其他线程修改过,不执行更新操作。
  3. 返回操作是否成功。

伪代码

function CAS(ptr, expected, desired):
    if *ptr == expected:   # 比较
        *ptr = desired     # 交换
        return true        # 表示更新成功
    else:
        return false       # 表示更新失败
    

示例

假设我们有一个变量 counter,值为 10。线程希望将其更新为 20,前提是它的值仍然是 10

CAS 操作:

  • 初始值:counter = 10
  • CAS(counter, 10, 20)
    • 比较:counter == 10,成立。
    • 交换:将 counter 更新为 20
    • 返回:true(成功)。
  • counter 已被其他线程修改为 15
    • 比较:counter == 10,不成立。
    • 不交换:counter 保持为 15
    • 返回:false(失败)。

底层实现

CAS 通常由硬件支持,例如在 x86 架构上,可以通过以下指令实现:

  • CMPXCHG:比较并交换指令。

在高级语言中,CAS 通常通过内置的库函数或原子操作接口提供支持,例如:

  • C/C++:std::atomic 提供的 compare_exchange_weakcompare_exchange_strong
  • Java:AtomicInteger.compareAndSet
  • Go:sync/atomic.CompareAndSwap

C++ 示例

优点

  • 高效:无需使用锁,避免了线程阻塞。
  • 原子性:CAS 是由硬件直接支持的原子操作,线程安全。
  • 轻量级:与传统的锁机制相比,CAS 不需要切换线程上下文,性能更高。

局限性

  • ABA 问题

    当一个变量的值从 A 改变为 B,然后又变回 A,CAS 无法感知到中间的变化。

    解决办法:使用版本号。例如,将变量和版本号打包在一起,通过版本号区分中间变化。

    示例:

    text
        Initial: A -> B -> A
        CAS compares: A == A -> success (ignores the fact that it was modified to B in between)
                
  • 自旋问题

    如果多个线程频繁失败,会导致 CPU 忙于重试,浪费资源。

    解决办法:在必要时配合锁或线程让步机制。

  • 不可移植性

    CAS 的底层实现依赖于硬件支持,不同平台的实现可能不同。

应用场景

  • 无锁队列和无锁栈:CAS 是实现无锁数据结构(如环形队列、无锁栈)的核心。
  • 计数器:通过 CAS 实现线程安全的计数器,无需加锁。
  • 同步原语:构建复杂的同步机制(如信号量、互斥锁等)的基础。
  • 自旋锁:在实现轻量级的自旋锁时,可以使用 CAS 检测和修改锁状态。

实际案例

基于 CAS 的自旋锁实现

总结

CAS 是并发编程的基石,通过硬件支持的原子操作实现无锁编程。它的优势在于性能和线程安全,但使用时需注意 ABA 问题和自旋可能导致的性能开销。在需要高性能和高并发的场景中,CAS 是不可或缺的技术工具。

阅读完毕,很棒哦!

文章评论

站点信息

  • 网站地址:www.xiaodahan.com
  • 我的QQ: 3306916637