2012-02-12 6 views
8

Tôi đang làm việc trên hàm appengine-mapreduce và đã sửa đổi bản trình diễn cho phù hợp với mục đích của tôi. Về cơ bản, tôi có một triệu dòng trên định dạng sau: userid, time1, time2. Mục đích của tôi là tìm sự khác biệt giữa time1 và time2 cho mỗi userid.Giới hạn bộ nhớ được nhấn với appengine-mapreduce

Tuy nhiên, khi tôi chạy này trên Google App Engine, tôi gặp phải thông báo lỗi này trong phần log:

Đã vượt quá giới hạn bộ nhớ tin mềm với 180,56 MB sau khi phục vụ 130 yêu cầu tổng Trong khi xử lý yêu cầu này, quá trình xử lý yêu cầu này được phát hiện là sử dụng quá nhiều bộ nhớ và đã bị chấm dứt. Điều này có khả năng gây ra một quy trình mới được sử dụng cho yêu cầu tiếp theo đối với đơn đăng ký của bạn. Nếu bạn thấy thông báo này thường xuyên, bạn có thể bị rò rỉ bộ nhớ trong ứng dụng của mình.

def time_count_map(data): 
    """Time count map function.""" 
    (entry, text_fn) = data 
    text = text_fn() 

    try: 
    q = text.split('\n') 
    for m in q: 
     reader = csv.reader([m.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
    logging.debug(e) 


def time_count_reduce(key, values): 
    """Time count reduce function.""" 
    time = 0.0 
    for subtime in values: 
    time += float(subtime) 
    realtime = int(time) 
    yield "%s: %d\n" % (key, realtime) 

Mọi người có thể đề xuất cách khác để tối ưu hóa mã của tôi tốt hơn không? Cảm ơn!!

được sửa đổi:

Dưới đây là xử lý đường ống:

class TimeCountPipeline(base_handler.PipelineBase): 
    """A pipeline to run Time count demo. 

    Args: 
    blobkey: blobkey to process as string. Should be a zip archive with 
     text files inside. 
    """ 

    def run(self, filekey, blobkey): 
    logging.debug("filename is %s" % filekey) 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "time_count", 
     "main.time_count_map", 
     "main.time_count_reduce", 
     "mapreduce.input_readers.BlobstoreZipInputReader", 
     "mapreduce.output_writers.BlobstoreOutputWriter", 
     mapper_params={ 
      "blob_key": blobkey, 
     }, 
     reducer_params={ 
      "mime_type": "text/plain", 
     }, 
     shards=32) 
    yield StoreOutput("TimeCount", filekey, output) 

Mapreduce.yaml:

mapreduce: 
- name: Make messages lowercase 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.lower_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 
- name: Make messages upper case 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.upper_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 

Phần còn lại của các tập tin được chính xác giống như demo.

Tôi đã tải lên một bản sao của mã của tôi trên dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

+0

Bạn có thể hiển thị cấu hình Mapreduce của mình không? Đối với một số lý do, có vẻ như bạn đang chuyển toàn bộ tệp tới người lập bản đồ, thay vì lập bản đồ trên nó theo từng dòng. –

+0

Xin chào Daniel, câu hỏi của tôi đã được chỉnh sửa. Cảm ơn, thực sự đánh giá cao nó! – autumngard

Trả lời

2

Có khả năng tập tin đầu vào của bạn vượt quá giới hạn bộ nhớ mềm trong kích thước. Đối với các tệp lớn, hãy sử dụng BlobstoreLineInputReader hoặc BlobstoreZipLineInputReader.

Những trình đọc đầu vào này vượt qua một cái gì đó khác với hàm map, chúng vượt qua start_position trong tệp và dòng văn bản.

map Chức năng của bạn có thể trông giống như:

def time_count_map(data): 
    """Time count map function.""" 
    text = data[1] 

    try: 
     reader = csv.reader([text.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
     logging.debug(e) 

Sử dụng BlobstoreLineInputReader sẽ cho phép các công việc để chạy nhanh hơn nhiều vì nó có thể sử dụng nhiều hơn một mảnh, lên đến 256, nhưng nó có nghĩa là bạn cần phải tải lên của bạn các tập tin không nén, có thể là một cơn đau. Tôi xử lý nó bằng cách tải các tệp nén lên máy chủ cửa sổ EC2, sau đó giải nén và tải lên từ đó, vì băng thông ngược dòng quá lớn.

+0

Điều này làm việc rất tốt cho tôi! Cảm ơn rất nhiều! :) – autumngard

6

Cũng xem xét gọi gc.collect() tại các điểm thông thường trong mã của bạn. Tôi đã nhìn thấy một số câu hỏi SO về vượt quá giới hạn bộ nhớ mềm đã được giảm bớt bằng cách gọi gc.collect(), hầu hết phải làm với blobstore.

+0

đang gọi gc.collect() chỉ áp dụng cho blobstore hoặc nói chung? – marcadian