Tôi đang làm việc trên hàm appengine-mapreduce và đã sửa đổi bản trình diễn cho phù hợp với mục đích của tôi. Về cơ bản, tôi có một triệu dòng trên định dạng sau: userid, time1, time2. Mục đích của tôi là tìm sự khác biệt giữa time1 và time2 cho mỗi userid.Giới hạn bộ nhớ được nhấn với appengine-mapreduce
Tuy nhiên, khi tôi chạy này trên Google App Engine, tôi gặp phải thông báo lỗi này trong phần log:
Đã vượt quá giới hạn bộ nhớ tin mềm với 180,56 MB sau khi phục vụ 130 yêu cầu tổng Trong khi xử lý yêu cầu này, quá trình xử lý yêu cầu này được phát hiện là sử dụng quá nhiều bộ nhớ và đã bị chấm dứt. Điều này có khả năng gây ra một quy trình mới được sử dụng cho yêu cầu tiếp theo đối với đơn đăng ký của bạn. Nếu bạn thấy thông báo này thường xuyên, bạn có thể bị rò rỉ bộ nhớ trong ứng dụng của mình.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
Mọi người có thể đề xuất cách khác để tối ưu hóa mã của tôi tốt hơn không? Cảm ơn!!
được sửa đổi:
Dưới đây là xử lý đường ống:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
Phần còn lại của các tập tin được chính xác giống như demo.
Tôi đã tải lên một bản sao của mã của tôi trên dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
Bạn có thể hiển thị cấu hình Mapreduce của mình không? Đối với một số lý do, có vẻ như bạn đang chuyển toàn bộ tệp tới người lập bản đồ, thay vì lập bản đồ trên nó theo từng dòng. –
Xin chào Daniel, câu hỏi của tôi đã được chỉnh sửa. Cảm ơn, thực sự đánh giá cao nó! – autumngard