2013-08-09 36 views
9

Giả sử tôi muốn sử dụng bản đồ có thể thay đổi trong Scala để theo dõi số lần tôi đã xem một số chuỗi. Trong một bối cảnh đơn luồng, điều này rất dễ dàng:Chuyển đổi một cách an toàn giá trị trong bản đồ có thể thay đổi

import scala.collection.mutable.{ Map => MMap } 

class Counter { 
    val counts = MMap.empty[String, Int].withDefaultValue(0) 

    def add(s: String): Unit = counts(s) += 1 
} 

Thật không may này không phải là thread-an toàn, kể từ khi getupdate không xảy ra nguyên tử.

Concurrent maps thêm a few atomic operations API bản đồ có thể thay đổi, nhưng không phải là người tôi cần, đó sẽ giống như thế này:

def replace(k: A, f: B => B): Option[B] 

Tôi biết tôi có thể sử dụng ScalaSTM 's TMap:

import scala.concurrent.stm._ 

class Counter { 
    val counts = TMap.empty[String, Int] 

    def add(s: String): Unit = atomic { implicit txn => 
    counts(s) = counts.get(s).getOrElse(0) + 1 
    } 
} 

Nhưng (hiện tại) đó vẫn là một sự phụ thuộc phụ. Các tùy chọn khác sẽ bao gồm các tác nhân (phụ thuộc khác), đồng bộ hóa (có khả năng kém hiệu quả hơn), hoặc atomic references (less idiomatic) của Java.

Nói chung tôi muốn tránh các bản đồ có thể thay đổi trong Scala, nhưng đôi khi tôi cần loại điều này, và gần đây nhất tôi đã sử dụng phương pháp STM (thay vì chỉ lướt ngón tay của tôi và hy vọng tôi không nhận được cắn bởi giải pháp ngây thơ).

Tôi biết có một số sự đánh đổi ở đây (phụ thuộc bổ sung so với hiệu suất so với sự rõ ràng, v.v.), nhưng có bất kỳ điều gì giống như câu trả lời "đúng" cho vấn đề này trong Scala 2.10 không?

+1

những gì về một diễn viên Akka duy nhất ghi vào bản đồ có thể thay đổi? 'Counter.add' chỉ gửi một tin nhắn lửa và quên cho nó. Đối với lần đọc, tùy thuộc vào nhu cầu của bạn, chúng có thể xảy ra đồng thời hoặc cũng thông qua diễn viên. – gourlaysama

Trả lời

3

Giải pháp đơn giản nhất chắc chắn là đồng bộ hóa. Nếu không có quá nhiều tranh chấp, hiệu suất có thể không phải là xấu.

Nếu không, bạn có thể thử cuộn lên STM giống như thực hiện replace STM của riêng bạn. Một cái gì đó như thế này có thể làm:

object ConcurrentMapOps { 
    private val rng = new util.Random 
    private val MaxReplaceRetryCount = 10 
    private val MinReplaceBackoffTime: Long = 1 
    private val MaxReplaceBackoffTime: Long = 20 
} 
implicit class ConcurrentMapOps[A, B](val m: collection.concurrent.Map[A,B]) { 
    import ConcurrentMapOps._ 
    private def replaceBackoff() { 
    Thread.sleep((MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime)).toLong) // A bit crude, I know 
    } 

    def replace(k: A, f: B => B): Option[B] = { 
    m.get(k) match { 
     case None => return None 
     case Some(old) => 
     var retryCount = 0 
     while (retryCount <= MaxReplaceRetryCount) { 
      val done = m.replace(k, old, f(old)) 
      if (done) { 
      return Some(old) 
      } 
      else {   
      retryCount += 1 
      replaceBackoff() 
      } 
     } 
     sys.error("Could not concurrently modify map") 
    } 
    } 
} 

Lưu ý rằng sự cố va chạm được bản địa hóa cho một khóa nhất định. Nếu hai luồng truy cập cùng một bản đồ nhưng hoạt động trên các khóa riêng biệt, bạn sẽ không có xung đột và thao tác thay thế sẽ luôn thành công lần đầu tiên. Nếu một va chạm được phát hiện, chúng tôi chờ một chút (một khoảng thời gian ngẫu nhiên, để giảm thiểu tính chất của các chủ đề chiến đấu mãi mãi cho cùng một khóa) và thử lại.

Tôi không thể đảm bảo rằng tính năng này sẵn sàng cho sản xuất (tôi vừa mới ném nó ngay bây giờ), nhưng điều đó có thể làm được điều đó.

CẬP NHẬT: Tất nhiên (như Ionuț G. Stan đã chỉ ra), nếu tất cả những gì bạn muốn là tăng/giảm giá trị, thì java ConcurrentHashMap đã cung cấp các hoạt động theo cách không có khóa. Giải pháp trên của tôi áp dụng nếu bạn cần một phương pháp tổng quát hơn replace sẽ lấy hàm chuyển đổi làm tham số.

+0

Tôi nhận thấy trong mã Bản đồ anh đã chuyển sang ThreadLocalRandom https://github.com/scala/scala/blob/master/src/library/scala/collection/concurrent/TrieMap.scala#L473 –

10

Làm thế nào về cái này? Giả sử bạn không thực sự cần phương thức chung replace ngay bây giờ, chỉ cần một bộ đếm.

import java.util.concurrent.ConcurrentHashMap 
import java.util.concurrent.atomic.AtomicInteger 

object CountedMap { 
    private val counts = new ConcurrentHashMap[String, AtomicInteger] 

    def add(key: String): Int = { 
    val zero = new AtomicInteger(0) 
    val value = Option(counts.putIfAbsent(key, zero)).getOrElse(zero) 
    value.incrementAndGet 
    } 
} 

Bạn nhận được hiệu suất tốt hơn đồng bộ hóa trên toàn bộ bản đồ và bạn cũng nhận được gia số nguyên tử.

+0

Cảm ơn — Tôi quan tâm đến trường hợp chung, nhưng thật tốt khi thấy rằng điều này thật dễ dàng. –

+0

Đây là giải pháp đúng và tận dụng các thư viện đồng thời Java hiệu năng rất cao. –

+1

Tôi rất tò mò nếu có lý do để liên lạc với ConcurrentHashMap thay vì đồng thời.TrieMap. Tôi không có ý kiến, chỉ là diễn đàn là một quảng cáo cho API. –

2

Bạn đang gặp rắc rối nếu bản đồ của bạn chỉ ngồi ở đó dưới dạng val.Nếu nó đáp ứng trường hợp sử dụng của bạn, tôi khuyên bạn nên dùng một cái gì đó như

class Counter { 
    private[this] myCounts = MMap.empty[String, Int].withDefaultValue(0) 
    def counts(s: String) = myCounts.synchronized { myCounts(s) } 
    def add(s: String) = myCounts.synchronized { myCounts(s) += 1 } 
    def getCounts = myCounts.synchronized { Map[String,Int]() ++ myCounts } 
} 

để sử dụng ít tranh chấp. Để tranh chấp cao, bạn nên sử dụng bản đồ đồng thời được thiết kế để hỗ trợ việc sử dụng như vậy (ví dụ: java.util.concurrent.ConcurrentHashMap) và quấn các giá trị trong AtomicWhatever.

2

Nếu bạn là ok để làm việc với giao diện dựa trên tương lai:

trait SingleThreadedExecutionContext { 
    val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) 
} 

class Counter extends SingleThreadedExecutionContext { 
    private val counts = MMap.empty[String, Int].withDefaultValue(0) 

    def get(s: String): Future[Int] = future(counts(s))(ec) 

    def add(s: String): Future[Unit] = future(counts(s) += 1)(ec) 
} 

thử nghiệm sẽ trông giống như:

class MutableMapSpec extends Specification { 

    "thread safe" in { 

    import ExecutionContext.Implicits.global 

    val c = new Counter 
    val testData = Seq.fill(16)("1") 
    await(Future.traverse(testData)(c.add)) 
    await(c.get("1")) mustEqual 16 
    } 
} 
+0

Đây không phải là chủ đề an toàn. Trong khi bạn đảm bảo một nhà văn duy nhất tại một thời điểm, bạn vẫn có thể đọc chủ đề trong khi bản đồ đang được sửa đổi –

+0

Như tôi hiểu, tất cả các hoạt động - đọc, viết, trộn - sử dụng ec vì ngữ cảnh sẽ là chủ đề an toàn. Ops bên ngoài bối cảnh đó sẽ không được an toàn thread. Sẽ rất vui khi được nghe từ những người khác nếu hiểu biết này là chính xác. –

+0

Nhưng vấn đề là, việc đọc được thực hiện trực tiếp: khi bạn truy cập 'c.counts' bạn không sử dụng' ExecutionContext' chút nào. –