2013-01-10 6 views
14

Có ai biết nếu có bất kỳ thực hiện chốt nào sau đây:Latch có thể được tăng lên

  • có một phương pháp để giảm các giá trị chốt, hoặc chờ nếu giá trị là zero
  • có một phương pháp đã chờ đợi cho giá trị chốt là zero
  • có một phương pháp để bổ sung thêm một số giá trị của chốt

Trả lời

4

Thay vì bắt đầu lại từ AQS, bạn có thể sử dụng triển khai đơn giản như dưới đây. Nó hơi ngây thơ (nó được đồng bộ hóa so với thuật toán không khóa AQS) nhưng trừ khi bạn mong muốn sử dụng nó trong một kịch bản có nội dung, nó có thể đủ tốt.

public class CountUpAndDownLatch { 
    private CountDownLatch latch; 
    private final Object lock = new Object(); 

    public CountUpAndDownLatch(int count) { 
     this.latch = new CountDownLatch(count); 
    } 

    public void countDownOrWaitIfZero() throws InterruptedException { 
     synchronized(lock) { 
      while(latch.getCount() == 0) { 
       lock.wait(); 
      } 
      latch.countDown(); 
      lock.notifyAll(); 
     } 
    } 

    public void waitUntilZero() throws InterruptedException { 
     synchronized(lock) { 
      while(latch.getCount() != 0) { 
       lock.wait(); 
      } 
     } 
    } 

    public void countUp() { //should probably check for Integer.MAX_VALUE 
     synchronized(lock) { 
      latch = new CountDownLatch((int) latch.getCount() + 1); 
      lock.notifyAll(); 
     } 
    } 

    public int getCount() { 
     synchronized(lock) { 
      return (int) latch.getCount(); 
     } 
    } 
} 

Lưu ý: Tôi đã không kiểm tra nó trong chiều sâu nhưng có vẻ như cư xử như mong đợi:

public static void main(String[] args) throws InterruptedException { 
    final CountUpAndDownLatch latch = new CountUpAndDownLatch(1); 
    Runnable up = new Runnable() { 
     @Override 
     public void run() { 
      try { 
       System.out.println("IN UP " + latch.getCount()); 
       latch.countUp(); 
       System.out.println("UP " + latch.getCount()); 
      } catch (InterruptedException ex) { 
      } 
     } 
    }; 

    Runnable downOrWait = new Runnable() { 
     @Override 
     public void run() { 
      try { 
       System.out.println("IN DOWN " + latch.getCount()); 
       latch.countDownOrWaitIfZero(); 
       System.out.println("DOWN " + latch.getCount()); 
      } catch (InterruptedException ex) { 
      } 
     } 
    }; 

    Runnable waitFor0 = new Runnable() { 
     @Override 
     public void run() { 
      try { 
       System.out.println("WAIT FOR ZERO " + latch.getCount()); 
       latch.waitUntilZero(); 
       System.out.println("ZERO " + latch.getCount()); 
      } catch (InterruptedException ex) { 
      } 
     } 
    }; 
    new Thread(waitFor0).start(); 
    up.run(); 
    downOrWait.run(); 
    Thread.sleep(100); 
    downOrWait.run(); 
    new Thread(up).start(); 
    downOrWait.run(); 
} 

Output:

IN UP 1 
UP 2 
WAIT FOR ZERO 1 
IN DOWN 2 
DOWN 1 
IN DOWN 1 
ZERO 0 
DOWN 0 
IN DOWN 0 
IN UP 0 
DOWN 0 
UP 0 
+0

Vâng, điều đó sẽ hiệu quả. Nhưng tôi muốn một cái gì đó mà không sử dụng 2 đối tượng đồng bộ hóa để đạt được điều này. Tôi đã thực hiện một bằng cách sử dụng LockSupport và hiện đang thử nghiệm nó để xem nếu nó hoạt động và thực hiện okay (AQS dường như không được tốt cho việc này hoặc là). – Razvi

+0

@ dead10ck tại sao không? – assylias

4

java.util.concurrent.Semaphore dường như phù hợp với những hóa đơn.

  • Acquire() hoặc mua lại (n)
  • cũng được() (không chắc chắn tôi hiểu những gì là sự khác biệt là ở đây) (*)
  • phát hành() hoặc phát hành (n)

(*) Được rồi, không có phương pháp tích hợp để chờ cho đến khi semaphore trở thành không có sẵn. Tôi cho rằng bạn sẽ viết trình bao bọc của riêng mình cho acquire trước tiên là tryAcquire và nếu điều đó không kích hoạt "sự kiện bận rộn" của bạn (và tiếp tục sử dụng thông thường acquire). Mọi người sẽ cần phải gọi wrapper của bạn. Có thể phân lớp Semaphore?

+0

Nó gần tốt. Nhưng tôi không thể chờ đợi nó được bằng không từ những gì tôi biết. – Razvi

+0

cũng có được() giống như mua (1) – Razvi

+0

@assylias: Cảm ơn, đã cập nhật (Tôi chỉ lấy những gì Google cung cấp khi tôi tìm kiếm tên lớp) – Thilo

0

tôi cần một và xây dựng nó bằng cách sử dụng cùng một chiến lược như CountDownLatch sử dụng AQS (không chặn), lớp này cũng rất giống (nếu không chính xác) với lớp được tạo cho Apache Camel, tôi nghĩ nó cũng nhẹ hơn JDK Phaser, nó sẽ hoạt động giống như CountDownLact từ JDK, không cho phép bạn đếm ngược dưới 0 và sẽ cho phép bạn đếm ngược lên:

nhập java.util.concurrent.TimeUnit; nhập java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountingLatch 
{ 
    /** 
    * Synchronization control for CountingLatch. 
    * Uses AQS state to represent count. 
    */ 
    private static final class Sync extends AbstractQueuedSynchronizer 
    { 
    private Sync() 
    { 
    } 

    private Sync(final int initialState) 
    { 
     setState(initialState); 
    } 

    int getCount() 
    { 
     return getState(); 
    } 

    protected int tryAcquireShared(final int acquires) 
    { 
     return getState()==0 ? 1 : -1; 
    } 

    protected boolean tryReleaseShared(final int delta) 
    { 
     // Decrement count; signal when transition to zero 
     for(; ;){ 
     final int c=getState(); 
     final int nextc=c+delta; 
     if(nextc<0){ 
      return false; 
     } 
     if(compareAndSetState(c,nextc)){ 
      return nextc==0; 
     } 
     } 
    } 
    } 

    private final Sync sync; 

    public CountingLatch() 
    { 
    sync=new Sync(); 
    } 

    public CountingLatch(final int initialCount) 
    { 
    sync=new Sync(initialCount); 
    } 

    public void increment() 
    { 
    sync.releaseShared(1); 
    } 

    public int getCount() 
    { 
    return sync.getCount(); 
    } 

    public void decrement() 
    { 
    sync.releaseShared(-1); 
    } 

    public void await() throws InterruptedException 
    { 
    sync.acquireSharedInterruptibly(1); 
    } 

    public boolean await(final long timeout) throws InterruptedException 
    { 
    return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout)); 
    } 
} 
1

Đối với những người cần một giải pháp AQS dựa, đây cũng là loại làm việc cho tôi:

public class CountLatch { 

    private class Sync extends AbstractQueuedSynchronizer { 
     private static final long serialVersionUID = 1L; 

     public Sync() { 
     } 

     @Override 
     protected int tryAcquireShared(int arg) { 
      return count.get() == releaseValue ? 1 : -1; 
     } 

     @Override 
     protected boolean tryReleaseShared(int arg) { 
      return true; 
     } 
    } 

    private final Sync sync; 
    private final AtomicLong count; 
    private volatile long releaseValue; 

    public CountLatch(final long initial, final long releaseValue) { 
     this.releaseValue = releaseValue; 
     this.count = new AtomicLong(initial); 
     this.sync = new Sync(); 
    } 

    public void await() throws InterruptedException { 
     sync.acquireSharedInterruptibly(1); 
    } 

    public long countUp() { 
     final long current = count.incrementAndGet(); 
     if (current == releaseValue) { 
      sync.releaseShared(0); 
     } 
     return current; 
    } 

    public long countDown() { 
     final long current = count.decrementAndGet(); 
     if (current == releaseValue) { 
      sync.releaseShared(0); 
     } 
     return current; 
    } 

    public long getCount() { 
     return count.get(); 
    } 
} 

Bạn khởi Synchronizer với giá trị ban đầu và mục tiêu. Khi đã đạt được giá trị đích (bằng cách đếm lên và/hoặc xuống), các chuỗi chờ sẽ được giải phóng.

30

Bạn cũng có thể sử dụng một Phaser (java.util.concurrent.Phaser)

final Phaser phaser = new Phaser(1); // register self 
while (/* some condition */) { 
    phaser.register(); // Equivalent to countUp 
    // do some work asynchronously, invoking 
    // phaser.arriveAndDeregister() (equiv to countDown) in a finally block 
} 
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete 

Tôi hy vọng điều này sẽ giúp.

+1

Tôi thấy điều này linh hoạt hơn bất kỳ tùy chọn nào khác. – donlys

0

Đây là biến thể trên CounterLatch, có sẵn trên trang web Apache.

Phiên bản của chúng, vì những lý do được biết đến nhiều nhất, chặn chuỗi người gọi trong khi biến (AtomicInteger) có giá trị nhất định.

Nhưng đó là chiều cao của sự yên tĩnh để tinh chỉnh mã này để bạn có thể chọn hoặc chỉ là phiên bản Apache, hoặc ... để nói "chờ ở đây cho đến khi bộ đếm đạt đến một giá trị nhất định". Có thể cho rằng sau này sẽ có nhiều ứng dụng hơn. Trong trường hợp cụ thể của tôi, tôi đã xáo trộn điều này bởi vì tôi muốn kiểm tra xem tất cả các "khối" đã được xuất bản trong SwingWorker.process() ... nhưng tôi đã tìm thấy các ứng dụng khác cho nó.

Ở đây nó được viết bằng Jython, chính thức là ngôn ngữ tốt nhất trên thế giới (TM). Tôi sẽ xáo trộn lên một phiên bản Java trong khóa học do.

class CounterLatch(): 
    def __init__(self, initial = 0, wait_value = 0, lift_on_reached = True): 
     self.count = java.util.concurrent.atomic.AtomicLong(initial) 
     self.signal = java.util.concurrent.atomic.AtomicLong(wait_value) 

     class Sync(java.util.concurrent.locks.AbstractQueuedSynchronizer): 
      def tryAcquireShared(sync_self, arg): 
       if lift_on_reached: 
        return -1 if ((not self.released.get()) and self.count.get() != self.signal.get()) else 1 
       else: 
        return -1 if ((not self.released.get()) and self.count.get() == self.signal.get()) else 1 
      def tryReleaseShared(self, args): 
       return True 

     self.sync = Sync() 
     self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False 

    def await(self, *args): 
     if args: 
      assert len(args) == 2 
      assert type(args[ 0 ]) is int 
      timeout = args[ 0 ] 
      assert type(args[ 1 ]) is java.util.concurrent.TimeUnit 
      unit = args[ 1 ] 
      return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) 
     else: 
      self.sync.acquireSharedInterruptibly(1) 

    def count_relative(self, n): 
     previous = self.count.addAndGet(n) 
     if previous == self.signal.get(): 
      self.sync.releaseShared(0) 
     return previous 

NB phiên bản Apache sử dụng từ khóa volatile cho signalreleased. Trong Jython tôi không nghĩ rằng điều này tồn tại như vậy, nhưng sử dụng AtomicIntegerAtomicBoolean phải đảm bảo rằng không có giá trị nào là "lỗi thời" trong bất kỳ chuỗi nào.

Ví dụ sử dụng:

Trong constructor SwingWorker:

self.publication_counter_latch = CounterLatch() 

Trong SW.publish:

# increase counter value BEFORE publishing chunks 
self.publication_counter_latch.count_relative(len(chunks)) 
self.super__publish(chunks) 

Trong SW.process:

# ... do sthg [HERE] with the chunks! 
# AFTER having done what you want to do with your chunks: 
self.publication_counter_latch.count_relative(- len(chunks)) 

Trong thread đang đợi xử lý đoạn văn bản t o stop:

worker.publication_counter_latch.await() 
0

Có vẻ như một CountDownLatch sẽ làm như bạn muốn:

Một CountDownLatch được khởi tạo với một số lượng nhất định. Các phương thức chờ đợi khối cho đến khi số đếm hiện tại đạt đến 0 do yêu cầu phương thức countDown(), sau đó tất cả các chuỗi đang chờ được giải phóng và bất kỳ lời gọi tiếp theo nào đang chờ trở lại ngay lập tức. Đây là một hiện tượng một lần - không thể đặt lại số lượng. Nếu bạn cần phiên bản đặt lại số lượng, hãy xem xét sử dụng CyclicBarrier.

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html