Có ai biết cách viết một hàm mở rộng trả về một ParallelQuery trong PLINQ không?Làm cách nào để viết một hàm mở rộng nhận biết luồng cho PLINQ?
Cụ thể hơn, tôi có vấn đề sau: Tôi muốn thực hiện chuyển đổi trong truy vấn PLINQ cần động cơ, công cụ tạo tốn kém và không thể truy cập đồng thời.
tôi có thể làm như sau:
var result = source.AsParallel().Select ((i) => { var e = new Engine(); return e.Process(i); })
Ở đây, động cơ được tạo ra một lần mỗi mục, mà là quá tốn kém.
Tôi muốn động cơ được tạo sau khi cho mỗi chủ đề.
Với tổng hợp, tôi có thể đến gần với những gì tôi muốn với một cái gì đó giống như
// helper class: engine to use plus list of results obtained in thread so far
class EngineAndResults {
public Engine engine = null;
public IEnumerable<ResultType> results;
}
var result = source.AsParallel().Aggregate (
// done once per block of items (=thread),
// returning an empty list, but a new engine
() => new EngineAndList() {
engine = new Engine(),
results = Enumerable.Empty<ResultType>()
},
// we process a new item and put it to the thread-local list,
// preserving the engine for further use
(engineAndResults, item) => new EngineAndResults() {
engine = engineAndResults.engine,
results = Enumerable.Concat (
engineAndResults.results,
new ResultType [] { engineAndResults.engine.Process (item) }
)
},
// tell linq how to aggregate across threads
(engineAndResults1, engineAndResults2) => new EngineAndResults() {
engine = engineAndResults1.engine,
results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results)
},
// after all aggregations, how do we come to the result?
engineAndResults => engineAndResults.results
);
Như bạn thấy, tôi lạm dụng accumulator để thực hiện một động cơ cho mỗi thread. Vấn đề ở đây là PLINQ cuối cùng kết hợp các kết quả thành một IEnumerable duy nhất, làm cho các luồng được đồng bộ hóa. Đây không phải là rất tốt đẹp nếu tôi muốn nối thêm một phần mở rộng PLINQ sau đó.
tôi sẽ đánh giá cao điều gì đó như
var result = source.AsParallel()
.SelectWithThreadwiseInitWhichIAmLookingFor (
() => new Engine(),
(engine, item) => engine.Process (item)
)
Không ai có bất kỳ ý tưởng làm thế nào để đạt được điều này?
Cảm ơn bạn. Đây là một giải pháp tốt. Tôi đã làm một thử nghiệm ngắn, và nó có vẻ hoạt động tốt. Tôi đã cố gắng tìm một cách để đưa khởi tạo vào chức năng mở rộng, nhưng không thành công - rõ ràng ThreadLocal phải được tạo trước khi AsParallel được gọi. Tôi không thấy lý do cho điều đó, nhưng dù sao, đây không phải là một vấn đề lớn. – JohnB
Tôi nghĩ rằng nó không hoạt động, bởi vì bạn đã tạo một 'ThreadLocal' mới cho mỗi lần lặp, do đó không thể chia sẻ cho các lần lặp thực hiện trên cùng một luồng. Tất cả các lần lặp chạy trên cùng một luồng cần cùng một thể hiện của 'ThreadLocal'. – svick