python - load balancing broker doesnt work when clients and workers are separated out? -
alltogether.py
from __future__ import print_function import threading import time import zmq nbr_clients = 10 nbr_workers = 3 def worker_thread(worker_url, context, i): """ worker using req socket lru routing """ socket = context.socket(zmq.req) # set worker identity socket.identity = (u"worker-%d" % (i)).encode('ascii') socket.connect(worker_url) # tell borker ready work socket.send(b"ready") try: while true: address = socket.recv() empty = socket.recv() request = socket.recv() print("%s: %s\n" % (socket.identity.decode('ascii'), request.decode('ascii')), end='') socket.send(address, zmq.sndmore) socket.send(b"", zmq.sndmore) socket.send(b"ok") except zmq.contextterminated: # context terminated quit silently homecoming def client_thread(client_url, context, i): """ basic request-reply client using req socket """ socket = context.socket(zmq.req) socket.identity = (u"client-%d" % (i)).encode('ascii') socket.connect(client_url) # send request, reply socket.send(b"hello") reply = socket.recv() print("%s: %s\n" % (socket.identity.decode('ascii'), reply.decode('ascii')), end='') def main(): """ main method """ url_worker = "inproc://workers" url_client = "inproc://clients" client_nbr = nbr_clients # prepare our context , sockets context = zmq.context() frontend = context.socket(zmq.router) frontend.bind(url_client) backend = context.socket(zmq.router) backend.bind(url_worker) # create workers , clients threads in range(nbr_workers): thread = threading.thread(target=worker_thread, args=(url_worker, context, i, )) thread.start() in range(nbr_clients): thread_c = threading.thread(target=client_thread, args=(url_client, context, i, )) thread_c.start() # logic of lru loop # - poll backend always, frontend if 1+ worker ready # - if worker replies, queue worker ready , forwards reply # client if necessary # - if client requests, pop next worker , send request # queue of available workers available_workers = 0 workers_list = [] # init poller poller = zmq.poller() # poll worker activity on backend poller.register(backend, zmq.pollin) # poll front-end if have available workers poller.register(frontend, zmq.pollin) while true: socks = dict(poller.poll()) # handle worker activity on backend if (backend in socks , socks[backend] == zmq.pollin): # queue worker address lru routing worker_addr = backend.recv() assert available_workers < nbr_workers # add together worker list of workers available_workers += 1 workers_list.append(worker_addr) # sec frame empty empty = backend.recv() assert empty == b"" # 3rd frame ready or else client reply address client_addr = backend.recv() # if client reply, send rest frontend if client_addr != b"ready": # next frame empty empty = backend.recv() assert empty == b"" reply = backend.recv() frontend.send(client_addr, zmq.sndmore) frontend.send(b"", zmq.sndmore) frontend.send(reply) client_nbr -= 1 if client_nbr == 0: break # exit after n messages # poll on frontend if workers available if available_workers > 0: if (frontend in socks , socks[frontend] == zmq.pollin): # next client request, route lru worker # client request [address][empty][request] client_addr = frontend.recv() empty = frontend.recv() assert empty == b"" request = frontend.recv() # dequeue , drop next worker address available_workers -= 1 worker_id = workers_list.pop() backend.send(worker_id, zmq.sndmore) backend.send(b"", zmq.sndmore) backend.send(client_addr, zmq.sndmore) backend.send(b"", zmq.sndmore) backend.send(request) # out of infinite loop: housekeeping frontend.close() backend.close() context.term() if __name__ == "__main__": main()
the above works is.
cycrh6rtp35.rtp.netapp.com{}: ./alltogether.py worker-1: hello worker-0: hello worker-2: hello client-0: ok client-1: ok worker-1: hello worker-0: hello client-2: ok worker-2: hello client-5: ok client-4: ok worker-1: hello worker-2: hello client-3: ok client-6: ok worker-0: hello worker-1: hello client-7: ok client-8: ok client-9: ok
i wanted separated out logic 3 py files 1 worker 1 client 1 broker
broker.py
def do_lb(): url_worker = "inproc://workers" url_client = "inproc://clients" client_nbr = nbr_clients # prepare our context , sockets context = zmq.context() frontend = context.socket(zmq.router) frontend.bind(url_client) backend = context.socket(zmq.router) backend.bind(url_worker) # create workers , clients threads #raw_input("press come in continue...") #print 'continuing...' # logic of lru loop # - poll backend always, frontend if 1+ worker ready # - if worker replies, queue worker ready , forwards reply # client if necessary # - if client requests, pop next worker , send request # queue of available workers available_workers = 0 workers_list = [] # init poller poller = zmq.poller() # poll worker activity on backend poller.register(backend, zmq.pollin) # poll front-end if have available workers poller.register(frontend, zmq.pollin) while true: print 'in while true loop' socks = dict(poller.poll()) print 'out1' # handle worker activity on backend if (backend in socks , socks[backend] == zmq.pollin): print 'out11' # queue worker address lru routing worker_addr = backend.recv() assert available_workers < nbr_workers # add together worker list of workers available_workers += 1 workers_list.append(worker_addr) print 'out12' # sec frame empty empty = backend.recv() assert empty == b"" print 'out13' # 3rd frame ready or else client reply address client_addr = backend.recv() # if client reply, send rest frontend if client_addr != b"ready": # next frame empty empty = backend.recv() assert empty == b"" reply = backend.recv() frontend.send(client_addr, zmq.sndmore) frontend.send(b"", zmq.sndmore) frontend.send(reply) client_nbr -= 1 if client_nbr == 0: break # exit after n messages print 'out2' # poll on frontend if workers available if available_workers > 0: print 'workers available' if (frontend in socks , socks[frontend] == zmq.pollin): # next client request, route lru worker # client request [address][empty][request] client_addr = frontend.recv() empty = frontend.recv() assert empty == b"" request = frontend.recv() # dequeue , drop next worker address available_workers -= 1 worker_id = workers_list.pop() backend.send(worker_id, zmq.sndmore) backend.send(b"", zmq.sndmore) backend.send(client_addr, zmq.sndmore) backend.send(b"", zmq.sndmore) backend.send(request) else: print 'workers not available' # out of infinite loop: housekeeping frontend.close() backend.close() context.term()
worker.py
import random import sys import threading import time import zmq # using inproc def worker_thread(context, i): """ worker using req socket lru routing """ print 'creating worker %s' % worker_url = "inproc://workers" socket = context.socket(zmq.req) # set worker identity socket.identity = (u"worker-%d" % (i)).encode('ascii') socket.connect(worker_url) # tell borker ready work socket.send(b"ready") try: while true: print 'in while true loop, waiting...' address = socket.recv() empty = socket.recv() request = socket.recv() print("%s: %s\n" % (socket.identity.decode('ascii'), request.decode('ascii'))) socket.send(address, zmq.sndmore) socket.send(b"", zmq.sndmore) socket.send(b"ok") except zmq.contextterminated: # context terminated quit silently homecoming
and client.py
import random import sys import threading import time import zmq #using inproc def client_thread(context, i): """ basic request-reply client using req socket """ print 'creating client %s' % client_url = "inproc://clients" socket = context.socket(zmq.req) socket.identity = (u"client-%d" % (i)).encode('ascii') socket.connect(client_url) # send request, reply print 'sending hello' socket.send(b"hello") reply = socket.recv() print("%s: %s\n" % (socket.identity.decode('ascii'), reply.decode('ascii')))
i started broker.py first , workers.py , clients.py , hang , doing nothing.
anyone know why?
python zeromq pyzmq
No comments:
Post a Comment