posts - 225, comments - 62, trackbacks - 0, articles - 0
   :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理
#include <semaphore.h>
#include 
<atomic>
#include 
<assert.h>
#include 
<pthread.h>
#include 
<thread>

#include 
<unistd.h>
#include 
<sys/time.h>

// 多生产者单消费者无锁共享队列
template<typename T>
class shared_queue
{
    shared_queue
& operator=(const shared_queue&= delete;
    shared_queue(
const shared_queue& other) = delete;

public:
    shared_queue(size_t max_size) {
        _capacity 
= max_size + 1;
        
// 把capacity对齐到2的幂, 为了快速用&mask替代%capacity操作
        if ((_capacity & (_capacity - 1)) != 0)
        {
            size_t n 
= _capacity;
            
while (n) {
                _capacity 
= n;
                n 
&= n - 1;
            }
            _capacity 
<<= 1;
        }
        
// 断言_capacity是比传入max_size大的最小的2的幂
        assert(_capacity != 0 && (_capacity & (_capacity - 1)) == 0);
        assert((_capacity 
>> 1<= max_size && max_size < _capacity);
        _mask 
= _capacity - 1;

        _data 
= new T[_capacity];
        _used 
= new bool[_capacity];
        std::fill(_used, _used
+_capacity, false);
        _begin 
= _end = 0;
        sem_init(
&_readable_count, 00);
    }

    
~shared_queue() {
        sem_destroy(
&_readable_count);
    }

    
void push(T item) {
        uint32_t cur 
= _end++;
        cur 
&= _mask; // 等价于 cur %= _capacity;
        _data[cur] = item;
        _used[cur] 
= true;
        
        sem_post(
&_readable_count);
    }

    
void wait_and_pop(T& popped_item) {
        sem_wait(
&_readable_count);
        uint32_t cur 
= _begin++;
        cur 
&= _mask; // 等价于 cur %= _capacity;
        volatile const bool& used = _used[cur];
        
while (!used); // 忙等待,这是必须的,考虑两个线程同时调用push的情况,先进入的可能后结束
        // 我仍然不能保证现在的处理是100%正确的,或许在push中_data[cur]和_used[cur]的赋值之间需要加入内存屏障

        popped_item 
= std::move(_data[cur]);
        _used[cur] 
= false;
    }

private:
    T
* _data;
    
bool* _used;
    size_t _capacity;
    size_t _mask;

    uint32_t _begin;
    std::atomic
<uint32_t> _end;
    sem_t _readable_count;
};

shared_queue
<int> queue(128);

void producer1()
{
    
for(int i=0;i<100;i++) {
        queue.push(i);
        usleep(
1);
    }
}

void producer2()
{
    
for(int i=100;i<200;i++) {
        queue.push(i);
        usleep(
1);
    }
}

void consumer()
{
    
for(int i=0;i<200;i++) {
        
int x;
        queue.wait_and_pop(x);
        printf(
"%d ", x);
        fflush(stdout);
    }
}

int main()
{
    std::thread th1(producer1);
    std::thread th2(producer2);
    std::thread th3(consumer);

    th1.join();
    th2.join();
    th3.join();
}

Feedback

# re: 用std::atomic和semaphore实现的多生产者单消费者无锁队列  回复  更多评论   

2015-03-17 16:00 by leeco
此外,使用spinlock进行小范围加锁其实也不会增加多少开销,会比mutex轻量的多,而其实上文中使用的sem_post有潜在的较大开销,根据实验当消费者线程因sem_wait进入睡眠,而sem_post调用会将其唤醒的时候,调用sem_post将会有比较大的开销。
如果生产者是对开销非常敏感的,而消费者不一定要保证实时处理生产者发来的东西的话可以把消费者的循环改成,当没有东西需要处理的时候就让生产者睡一个固定时间,然后自己醒了再检查是否有东西需要处理。
while(_readable_count<=0) {
usleep(interval);
}
--_readable_count;
...

如果是多消费者的话,需要加锁成如下样子

lock;
while(_readable_count<=0) {
unlock;
usleep(interval);
lock;
}
--_readable_count;
...
unlock;
只有注册用户登录后才能发表评论。