2013-02-21 22 views
5

Tôi có một trình xử lý yêu cầu cập nhật một thực thể, lưu nó vào kho dữ liệu, sau đó cần thực hiện một số công việc bổ sung trước khi trở về (như xếp hàng tác vụ nền và tuần tự hóa một số kết quả). Tôi muốn song song mã này, để công việc bổ sung được thực hiện trong khi thực thể đang được lưu.Làm thế nào để ngăn chặn ndb từ đợt một cuộc gọi put_async() và làm cho nó phát hành RPC ngay lập tức?

Đây là những gì mã xử lý của tôi nắm xuống:

class FooHandler(webapp2.RequestHandler): 
    @ndb.toplevel 
    def post(self): 
     foo = yield Foo.get_by_id_async(some_id) 

     # Do some work with foo 

     # Don't yield, as I want to perform the code that follows 
     # while foo is being saved to the datastore. 
     # I'm in a toplevel, so the handler will not exit as long as 
     # this async request is not finished. 
     foo.put_async() 

     taskqueue.add(...) 
     json_result = generate_result() 
     self.response.headers["Content-Type"] = "application/json; charset=UTF-8" 
     self.response.write(json_result) 

Tuy nhiên, Appstats cho thấy datastore.Put RPC đang được thực hiện nối tiếp, sau khi taskqueue.Add:

Appstats screenshot

Một chút đào xung quanh trong ndb.context.py cho thấy rằng cuộc gọi put_async() sẽ được thêm vào AutoBatcher thay vì RPC được phát hành ngay lập tức.

Vì vậy, tôi giả sử rằng sẽ bị xóa khi hết toplevel chờ tất cả các cuộc gọi không đồng bộ được hoàn thành.

Tôi hiểu rằng việc đặt hàng loạt có lợi ích thực trong một số trường hợp, nhưng trong trường hợp của tôi ở đây tôi thực sự muốn RPC được gửi ngay lập tức, vì vậy tôi có thể thực hiện công việc khác trong khi thực thể đang được lưu.

Nếu tôi làm yield foo.put_async(), sau đó tôi nhận được thác nước cùng trong Appstats, nhưng với datastore.Put được thực hiện trước phần còn lại:

2nd Appstats screenshot

này là để được mong đợi, như yield làm handler tôi chờ đợi Gọi put_async() để hoàn thành trước khi thực hiện phần còn lại của mã.

Tôi cũng đã thử thêm một cuộc gọi đến ndb.get_context().flush() ngay sau foo.put_async(), nhưng các cuộc gọi datastore.Puttaskqueue.BulkAdd vẫn không được thực hiện song song theo Appstats.

Vì vậy, câu hỏi của tôi là: làm thế nào tôi có thể buộc cuộc gọi đến put_async() để bỏ qua bộ nạp tự động và phát hành RPC ngay lập tức?

+0

Sản phẩm có đang được sản xuất hoặc tại địa phương không? – Lipis

+0

Nó đang được sản xuất. –

Trả lời

6

Không có cách nào hỗ trợ để làm điều đó. Có lẽ nên có. Bạn có thể thử nếu điều này hoạt động?

loop - ndb.eventloop.get_event_loop() 
while loop.run_idle(): 
    pass 

Bạn có thể phải nhìn vào mã nguồn của NDB/eventloop.py để xem những gì khác bạn có thể thử - về cơ bản bạn muốn thử hầu hết những gì run0() không ngoại trừ chờ đợi RPC. Đặc biệt, nó có thể là bạn sẽ phải làm điều này:

while loop.current: 
    loop.run0() 
while loop.run_idle(): 
    pass 

(Điều này vẫn không được hỗ trợ, bởi vì có những điều kiện khác mà bạn có thể phải xử lý quá, nhưng những người dường như không xảy ra trong bạn ví dụ.)

+0

Tôi đã làm cho nó hoạt động bằng cách gọi 'ndb.get_context(). Flush()' theo sau bởi 2 vòng lặp mà bạn đã đề xuất ngay sau khi gọi hàm 'foo.put_async()'. Tôi tin rằng cần có một cách hỗ trợ chính thức để làm điều này, vì tôi không nghĩ rằng kịch bản sử dụng của tôi là không phổ biến (lưu một thực thể, sau đó kết thúc công việc xử lý còn lại trong khi thực thể đang được lưu). Tôi đã gửi yêu cầu tính năng cho nó: http://code.google.com/p/googleappengine/issues/detail?id=8863 –

+0

Tôi nghĩ điều thực sự cần thiết là taskqueue.add_async để nhận hàng đợi công việc rpc vào chế độ rảnh/vòng rpc trong eventloop. http://code.google.com/p/appengine-ndb-experiment/issues/detail?id=180 – tesdal

+0

Mã chính xác nào phù hợp với bạn? flush() là một tasklet, do đó bạn sẽ phải tạo ra nó, có thể gây ra sự chậm trễ nhiều hơn bạn muốn. Dù sao, đồng ý điều này sẽ là một tính năng hữu ích. Có lẽ nó sẽ nhận được nhiều sự chú ý hơn nếu bạn nộp nó trong trình theo dõi NDB? –

-2

Hãy thử điều này, tôi không chắc chắn 100% nó sẽ giúp:

foo = yield Foo.get_by_id_async(some_id) 
future = foo.put_async() 
future.done() 

yêu cầu NDB được đưa vào autobatcher, lô được gửi đến RPC khi bạn cần một kết quả. Vì bạn không cần kết quả của foo.put_async(), nó không được gửi cho đến khi bạn thực hiện một cuộc gọi ndb khác (bạn không) hoặc cho đến khi @ ndb.toplevel kết thúc.

Gọi trong tương lai.done() không chặn, nhưng tôi đoán nó có thể kích hoạt yêu cầu.

Một điều để cố gắng buộc các hoạt động là:

ndb.get_context().flush() 
+0

Cảm ơn, nhưng nó không làm gì cả. 'Future.done()' chỉ thực hiện 'return self._done' mà không cần xử lý, và tôi đã thử' Context.flush() '. –