2008-10-15 7 views
6

Tôi muốn thực hiện một kịch bản sản xuất/tiêu dùng tuân theo giao diện mà là khoảng:Làm thế nào để thực hiện chặn đọc sử dụng POSIX đề

class Consumer { 
private: 
    vector<char> read(size_t n) { 
     // If the internal buffer has `n` elements, then dequeue them 
     // Otherwise wait for more data and try again 
    } 
public: 
    void run() { 
     read(10); 
     read(4839); 
     // etc 
    } 
    void feed(const vector<char> &more) { 
     // Safely queue the data 
     // Notify `read` that there is now more data 
    } 
}; 

Trong trường hợp này, feedrun sẽ chạy trên chủ đề riêng biệt và read nên là chặn đọc (như recvfread). Rõ ràng, tôi sẽ cần một số loại trừ lẫn nhau trên deque của tôi, và tôi sẽ cần một số loại hệ thống thông báo để thông báo read để thử lại.

Tôi nghe thấy biến điều kiện là cách để thực hiện, nhưng tất cả trải nghiệm đa luồng của tôi đều nằm trong Windows và có một khoảng thời gian khó khăn bao quanh đầu tôi.

Cảm ơn bạn đã trợ giúp!

(Vâng, tôi biết đó là hiệu quả để trở vectơ. Đừng đi vào đó.)

Trả lời

8

Mã này chưa sẵn sàng để sản xuất. Không kiểm tra lỗi được thực hiện trên kết quả của bất kỳ cuộc gọi thư viện nào.

Tôi đã khóa khóa/mở khóa của mutex trong LockThread vì vậy nó là ngoại lệ an toàn. Nhưng đó là về nó.

Ngoài ra nếu tôi đang thực hiện nghiêm túc, tôi sẽ bọc các biến mutex và điều kiện bên trong các đối tượng để chúng có thể bị lạm dụng bên trong các phương pháp khác của Người tiêu dùng. Nhưng miễn là bạn lưu ý rằng khóa phải được mua trước khi bạn sử dụng biến điều kiện (theo bất kỳ cách nào) thì tình huống đơn giản này có thể đứng như vậy.

Bạn quan tâm đến việc bạn đã kiểm tra thư viện luồng tăng cường?

#include <iostream> 
#include <vector> 
#include <pthread.h> 

class LockThread 
{ 
    public: 
    LockThread(pthread_mutex_t& m) 
     :mutex(m) 
    { 
     pthread_mutex_lock(&mutex); 
    } 
    ~LockThread() 
    { 
     pthread_mutex_unlock(&mutex); 
    } 
    private: 
     pthread_mutex_t& mutex; 
}; 
class Consumer 
{ 
    pthread_mutex_t  lock; 
    pthread_cond_t  cond; 
    std::vector<char> unreadData; 
    public: 
    Consumer() 
    { 
     pthread_mutex_init(&lock,NULL); 
     pthread_cond_init(&cond,NULL); 
    } 
    ~Consumer() 
    { 
     pthread_cond_destroy(&cond); 
     pthread_mutex_destroy(&lock); 
    } 

    private: 
     std::vector<char> read(size_t n) 
     { 
      LockThread locker(lock); 
      while (unreadData.size() < n) 
      { 
       // Must wait until we have n char. 
       // This is a while loop because feed may not put enough in. 

       // pthread_cond() releases the lock. 
       // Thread will not be allowed to continue until 
       // signal is called and this thread reacquires the lock. 

       pthread_cond_wait(&cond,&lock); 

       // Once released from the condition you will have re-aquired the lock. 
       // Thus feed() must have exited and released the lock first. 
      } 

      /* 
      * Not sure if this is exactly what you wanted. 
      * But the data is copied out of the thread safe buffer 
      * into something that can be returned. 
      */ 
      std::vector<char> result(n); // init result with size n 
      std::copy(&unreadData[0], 
         &unreadData[n], 
         &result[0]); 

      unreadData.erase(unreadData.begin(), 
          unreadData.begin() + n); 
      return (result); 
     } 
public: 
    void run() 
    { 
     read(10); 
     read(4839); 
     // etc 
    } 
    void feed(const std::vector<char> &more) 
    { 
     LockThread locker(lock); 

     // Once we acquire the lock we can safely modify the buffer. 
     std::copy(more.begin(),more.end(),std::back_inserter(unreadData)); 

     // Only signal the thread if you have the lock 
     // Otherwise race conditions happen. 
     pthread_cond_signal(&cond); 

     // destructor releases the lock and thus allows read thread to continue. 
    } 
}; 


int main() 
{ 
    Consumer c; 
} 
+0

Điều này trông rất đẹp. Một lưu ý (chỉ là một sàng lọc), nhưng hầu hết các trang web nói rằng bạn cần phải bảo vệ biến điều kiện chính nó với một mutex để ngăn chặn điều kiện chủng tộc. Đa luồng là vui, phải không? –

+0

Biến điều kiện được bảo vệ bởi một mutex. Trong cả hai trường hợp read() và feed(), bạn phải lấy khóa trước khi bạn có thể làm bất cứ điều gì với biến điều kiện. –

+0

Xin lỗi về điều đó. Tôi đã bỏ lỡ nó trong mã của bạn. Rất đẹp. –

1

tôi sẽ ném xuống một số-pseudo-code bán. Dưới đây là ý kiến ​​của tôi:

1) Các hạt khóa rất lớn ở đây. Nếu bạn cần truy cập nhanh hơn, bạn sẽ muốn suy nghĩ lại cấu trúc dữ liệu của mình. STL không phải là luồng an toàn.

2) Khóa sẽ chặn cho đến khi mutex cho phép. Cấu trúc mutex là nó cho phép 1 luồng qua nó cùng lúc với cơ chế khóa/mở khóa. Không cần phải bỏ phiếu hoặc cho một số loại cấu trúc ngoại lệ-esque.

3) Đây là một đoạn cắt khá dễ dàng về cú pháp trong vấn đề. Tôi không chính xác với cú pháp API hay C++, nhưng tôi tin rằng nó đưa ra một giải pháp đúng ngữ nghĩa.

4) Đã chỉnh sửa để trả lời nhận xét.

class piper 
{ 
pthread_mutex queuemutex; 
pthread_mutex readymutex; 
bool isReady; //init to false by constructor 

//whatever else 
}; 

piper::read() 
{//whatever 
pthread_mutex_lock(&queuemutex) 
if(myqueue.size() >= n) 
{ 
    return_queue_vector.push_back(/* you know what to do here */) 

    pthread_mutex_lock(&readymutex) 
    isReady = false; 
    pthread_mutex_unlock(&readymutex) 
} 
pthread_mutex_unlock(&queuemutex) 
} 

piper::push_em_in() 
{ 
//more whatever 
pthread_mutex_lock(&queuemutex) 
//push push push 
if(myqueue.size() >= n) 
{ 
    pthread_mutex_lock(&readymutex) 
    isReady = true; 
    pthread_mutex_unlock(&readymutex) 
} 
pthread_mutex_unlock(&queuemutex) 
} 
+0

Bắt đầu tốt, nhưng hãy nhớ rằng tôi muốn đọc để thành công. Không có gì đảm bảo rằng 'push_em_in' sẽ đổ đủ dữ liệu để điều đó xảy ra. Vì vậy, đọc sẽ cần phải chờ đợi cho đến khi có đủ. Đó là vòng lặp mà tôi muốn chắc chắn là hiệu quả (không quay). –

+0

Bạn cũng có thể sử dụng RAII để đảm bảo khóa() mở khóa() của bạn là ngoại lệ an toàn. –

+0

@Frank, đã thực hiện một cuộc tấn công khác vào khái niệm này. Bạn đang theo dõi làm thế nào để sử dụng mutex pthread tốt hơn bây giờ? –

2

Tôi có xu hướng sử dụng thứ mà tôi gọi là "Hàng đợi được đồng bộ hóa". Tôi quấn đợi bình thường và sử dụng một lớp Semaphore cho cả khóa và đảm đọc khối cũng giống như bạn mong muốn:

#ifndef SYNCQUEUE_20061005_H_ 
#define SYNCQUEUE_20061005_H_ 

#include <queue> 
#include "Semaphore.h" 

// similar, but slightly simpler interface to std::queue 
// this queue implementation will serialize pushes and pops 
// and block on a pop while empty (as apposed to throwing an exception) 
// it also locks as neccessary on insertion and removal to avoid race 
// conditions 

template <class T, class C = std::deque<T> > class SyncQueue { 
protected: 
    std::queue<T, C> m_Queue; 
    Semaphore   m_Semaphore; 
    Mutex    m_Mutex; 

public: 
    typedef typename std::queue<T, C>::value_type value_type; 
    typedef typename std::queue<T, C>::size_type size_type; 

    explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {} 

    bool empty() const    { return m_Queue.empty(); } 
    size_type size() const   { return m_Queue.size(); } 

    void push(const value_type& x); 
    value_type pop(); 
}; 

template <class T, class C> 
void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) { 
    // atomically push item 
    m_Mutex.lock(); 
    m_Queue.push(x); 
    m_Mutex.unlock(); 

    // let blocking semaphore know another item has arrived 
    m_Semaphore.v(); 
} 

template <class T, class C> 
typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() { 
    // block until we have at least one item 
    m_Semaphore.p(); 

    // atomically read and pop front item 
    m_Mutex.lock(); 
    value_type ret = m_Queue.front(); 
    m_Queue.pop(); 
    m_Mutex.unlock(); 

    return ret; 
} 

#endif 

Bạn có thể thực hiện Cột và mutexes với nguyên thủy phù hợp trong việc thực hiện luồng của bạn. Chú ý: việc thực hiện này là một ví dụ cho các phần tử đơn lẻ trong một hàng đợi, nhưng bạn có thể dễ dàng bọc nó với một chức năng mà đệm kết quả cho đến khi N được cung cấp.một cái gì đó như thế này nếu nó là một hàng đợi của ký tự:

std::vector<char> func(int size) { 
    std::vector<char> result; 
    while(result.size() != size) { 
     result.push_back(my_sync_queue.pop()); 
    } 
    return result; 
} 
1

Chỉ để cho vui, đây là một triển khai nhanh chóng và bẩn bằng cách sử dụng Boost. Nó sử dụng pthreads dưới mui xe trên nền tảng hỗ trợ nó, và trên cửa sổ sử dụng các hoạt động cửa sổ.

boost::mutex access; 
boost::condition cond; 

// consumer 
data read() 
{ 
    boost::mutex::scoped_lock lock(access); 
    // this blocks until the data is ready 
    cond.wait(lock); 

    // queue is ready 
    return data_from_queue(); 
} 

// producer 
void push(data) 
{ 
    boost::mutex::scoped_lock lock(access); 
    // add data to queue 

    if (queue_has_enough_data()) 
    cond.notify_one(); 
} 
+0

Điều kiện chỉ được thông báo nếu có đủ dữ liệu để vòng lặp không cần thiết - bạn nên đọc lên các chủ đề tăng và biến điều kiện, mã đúng và không có bế tắc –

+0

Điều đó có nghĩa là, điều kiện hoạt động tốt và giải phóng khóa trước khi chặn –

1

Để vui hơn, đây là phiên bản cuối cùng của tôi. STL-ized không có lý do chính đáng. :-)

#include <algorithm> 
#include <deque> 
#include <pthread.h> 

template<typename T> 
class MultithreadedReader { 
    std::deque<T> buffer; 
    pthread_mutex_t moreDataMutex; 
    pthread_cond_t moreDataCond; 

protected: 
    template<typename OutputIterator> 
    void read(size_t count, OutputIterator result) { 
     pthread_mutex_lock(&moreDataMutex); 

     while (buffer.size() < count) { 
      pthread_cond_wait(&moreDataCond, &moreDataMutex); 
     } 
     std::copy(buffer.begin(), buffer.begin() + count, result); 
     buffer.erase(buffer.begin(), buffer.begin() + count); 

     pthread_mutex_unlock(&moreDataMutex); 
    } 

public: 
    MultithreadedReader() { 
     pthread_mutex_init(&moreDataMutex, 0); 
     pthread_cond_init(&moreDataCond, 0); 
    } 

    ~MultithreadedReader() { 
     pthread_cond_destroy(&moreDataCond); 
     pthread_mutex_destroy(&moreDataMutex); 
    } 

    template<typename InputIterator> 
    void feed(InputIterator first, InputIterator last) { 
     pthread_mutex_lock(&moreDataMutex); 

     buffer.insert(buffer.end(), first, last); 
     pthread_cond_signal(&moreDataCond); 

     pthread_mutex_unlock(&moreDataMutex); 
    } 
}; 
+0

@Frank: tại sao read() được bảo vệ? – Kim

+0

Lớp học được thiết kế như là một lớp cơ sở mà tiểu loại đã tự đọc và chỉ muốn được cho ăn. Đó là một giao thức truyền trực tuyến, nơi lớp học giống như một ứng dụng unix nhỏ. –