2013-04-07 18 views
10

Tôi có một mảng lớn các đối tượng tùy chỉnh mà tôi cần thực hiện các tác vụ độc lập (có thể song song), bao gồm sửa đổi các tham số đối tượng. Tôi đã thử sử dụng cả một Manager(). Dict, và 'sharedmem'ory, nhưng cả hai đều không hoạt động. Ví dụ:Sửa đổi đối tượng trong đa xử lý python

import numpy as np 
import multiprocessing as mp 
import sharedmem as shm 


class Tester: 

    num = 0.0 
    name = 'none' 
    def __init__(self,tnum=num, tname=name): 
     self.num = tnum 
     self.name = tname 

    def __str__(self): 
     return '%f %s' % (self.num, self.name) 

def mod(test, nn): 
    test.num = np.random.randn() 
    test.name = nn 


if __name__ == '__main__': 

    num = 10 

    tests = np.empty(num, dtype=object) 
    for it in range(num): 
     tests[it] = Tester(tnum=it*1.0) 

    sh_tests = shm.empty(num, dtype=object) 
    for it in range(num): 
     sh_tests[it] = tests[it] 
     print sh_tests[it] 

    print '\n' 
    workers = [ mp.Process(target=mod, args=(test, 'some')) for test in sh_tests ] 

    for work in workers: work.start() 

    for work in workers: work.join() 

    for test in sh_tests: print test 

in ra:

0.000000 none 
1.000000 none 
2.000000 none 
3.000000 none 
4.000000 none 
5.000000 none 
6.000000 none 
7.000000 none 
8.000000 none 
9.000000 none 


0.000000 none 
1.000000 none 
2.000000 none 
3.000000 none 
4.000000 none 
5.000000 none 
6.000000 none 
7.000000 none 
8.000000 none 
9.000000 none 

Tức là các đối tượng không được sửa đổi.

Làm cách nào để đạt được hành vi mong muốn?

+0

http: // stackoverflow.com/questions/10721915/shared-memory-objects-in-python-multiprocessing – tacaswell

+0

bạn có thể gửi một liên kết đến 'sharedmem' tôi không thể tìm thấy bất cứ điều gì trên đó. – tacaswell

Trả lời

8

Vấn đề là khi các đối tượng được chuyển đến quy trình công nhân, chúng được đóng gói bằng dưa, được chuyển đến quy trình khác, nơi chúng được giải nén và làm việc. Các đối tượng của bạn không được truyền nhiều cho quá trình khác, như được nhân bản. Bạn không trả lại các đối tượng, vì vậy đối tượng nhân bản được sửa đổi vui vẻ, và sau đó bị vứt đi.

Có vẻ như điều này không thể thực hiện được (Python: Possible to share in-memory data between 2 separate processes) trực tiếp.

Điều bạn có thể làm là trả lại các đối tượng đã sửa đổi.

import numpy as np 
import multiprocessing as mp 



class Tester: 

    num = 0.0 
    name = 'none' 
    def __init__(self,tnum=num, tname=name): 
     self.num = tnum 
     self.name = tname 

    def __str__(self): 
     return '%f %s' % (self.num, self.name) 

def mod(test, nn, out_queue): 
    print test.num 
    test.num = np.random.randn() 
    print test.num 
    test.name = nn 
    out_queue.put(test) 




if __name__ == '__main__':  
    num = 10 
    out_queue = mp.Queue() 
    tests = np.empty(num, dtype=object) 
    for it in range(num): 
     tests[it] = Tester(tnum=it*1.0) 


    print '\n' 
    workers = [ mp.Process(target=mod, args=(test, 'some', out_queue)) for test in tests ] 

    for work in workers: work.start() 

    for work in workers: work.join() 

    res_lst = [] 
    for j in range(len(workers)): 
     res_lst.append(out_queue.get()) 

    for test in res_lst: print test 

này dẫn tới việc quan sát thú vị mà bởi vì các quá trình sinh ra là giống hệt nhau, tất cả họ đều bắt đầu với hạt giống tương tự cho các số ngẫu nhiên, vì vậy họ tất cả tạo ra cùng một số 'ngẫu nhiên'.

+0

Dòng '' workers = [mp.Process (... '' trông giống như bạn đang bắt đầu các quá trình '' num'' (tất cả cùng một lúc?) Trong ví dụ của bạn, nó chỉ là mười, nhưng làm thế nào bạn áp dụng điều này cho các mảng lớn hơn chứa hàng nghìn hoặc hàng triệu mục nhập (và do đó là công nhân) –

3

Tôi không thấy bạn truyền tham chiếu shm vào các tiến trình con nên tôi không thấy cách làm việc của chúng có thể được ghi lại vào bộ nhớ dùng chung. Có lẽ tôi đang thiếu một cái gì đó ở đây.

Ngoài ra, bạn đã xem xét numpy.memmap chưa? (BTW: tcaswell, các mô-đun được gọi ở đây có vẻ là: numpy-sharedmem). Ngoài ra, bạn có thể muốn đọc số Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing (PDF) của Sturla Molden như được đề xuất trong câu trả lời của unutbu cho [StackOverflow: Làm cách nào để truyền các mảng có khối lượng lớn giữa các tiến trình con trăn mà không lưu vào đĩa?] Và (How do I pass large numpy arrays between python subprocesses without saving to disk?). và của Joe Kington StackOverflow: NumPy vs. multiprocessing and mmap.

Đây có thể truyền cảm hứng hơn là có liên quan trực tiếp.

+0

cảm ơn con trỏ, nhưng không chắc đó có phải là gói thích hợp hay không – tacaswell

+0

+1 cho bộ sưu tập tốt đẹp của các liên kết! – tacaswell

3

Mã của bạn không cố gắng sửa đổi bộ nhớ dùng chung. Nó chỉ nhân bản các đối tượng riêng lẻ.

dtype=object nghĩa là sharedmem sẽ không làm việc vì lý do nêu in the link provided by @tcaswell:

chia sẻ đồ thị đối tượng đó bao gồm tài liệu tham khảo/con trỏ đến các đối tượng khác về cơ bản là không khả thi

Đối với đồng bằng (giá trị) các loại bạn có thể sử dụng bộ nhớ dùng chung, xem Use numpy array in shared memory for multiprocessing.

Cách tiếp cận manager cũng nên làm việc (nó chỉ là bản sao các đối tượng xung quanh):

import random 
from multiprocessing import Pool, Manager 

class Tester(object): 
    def __init__(self, num=0.0, name='none'): 
     self.num = num 
     self.name = name 

    def __repr__(self): 
     return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name) 

def init(L): 
    global tests 
    tests = L 

def modify(i_t_nn): 
    i, t, nn = i_t_nn 
    t.num += random.normalvariate(mu=0, sigma=1) # modify private copy 
    t.name = nn 
    tests[i] = t # copy back 
    return i 

def main(): 
    num_processes = num = 10 #note: num_processes and num may differ 
    manager = Manager() 
    tests = manager.list([Tester(num=i) for i in range(num)]) 
    print(tests[:2]) 

    args = ((i, t, 'some') for i, t in enumerate(tests)) 
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,)) 
    for i in pool.imap_unordered(modify, args): 
     print("done %d" % i) 
    pool.close() 
    pool.join() 
    print(tests[:2]) 

if __name__ == '__main__': 
    main()