
jnoller at gmail
Nov 21, 2009, 6:25 AM
Post #2 of 2
(125 views)
Permalink
|
|
Re: r76433 - in python/trunk: Doc/includes/mp_distributing.py Doc/library/multiprocessing.rst Lib/multiprocessing/queues.py
[In reply to]
|
|
Yes, I brain farted and pulled back in some debugging work I was in the middle of in the checkin, I reverted the queues.py change on trunk and removed it from the merges that followed. On Sat, Nov 21, 2009 at 9:02 AM, jesse.noller <python-checkins [at] python> wrote: > Author: jesse.noller > Date: Sat Nov 21 15:01:56 2009 > New Revision: 76433 > > Log: > issue5738: The distribution example was confusing, and out of date. It's too large to include inline in the docs as well. It belongs in an addons module outside the stdlib. Removing. > > Removed: > python/trunk/Doc/includes/mp_distributing.py > Modified: > python/trunk/Doc/library/multiprocessing.rst > python/trunk/Lib/multiprocessing/queues.py > > Deleted: python/trunk/Doc/includes/mp_distributing.py > ============================================================================== > --- python/trunk/Doc/includes/mp_distributing.py Sat Nov 21 15:01:56 2009 > +++ (empty file) > @@ -1,364 +0,0 @@ > -# > -# Module to allow spawning of processes on foreign host > -# > -# Depends on `multiprocessing` package -- tested with `processing-0.60` > -# > -# Copyright (c) 2006-2008, R Oudkerk > -# All rights reserved. > -# > - > -__all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] > - > -# > -# Imports > -# > - > -import sys > -import os > -import tarfile > -import shutil > -import subprocess > -import logging > -import itertools > -import Queue > - > -try: > - import cPickle as pickle > -except ImportError: > - import pickle > - > -from multiprocessing import Process, current_process, cpu_count > -from multiprocessing import util, managers, connection, forking, pool > - > -# > -# Logging > -# > - > -def get_logger(): > - return _logger > - > -_logger = logging.getLogger('distributing') > -_logger.propagate = 0 > - > -_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) > -_handler = logging.StreamHandler() > -_handler.setFormatter(_formatter) > -_logger.addHandler(_handler) > - > -info = _logger.info > -debug = _logger.debug > - > -# > -# Get number of cpus > -# > - > -try: > - slot_count = cpu_count() > -except NotImplemented: > - slot_count = 1 > - > -# > -# Manager type which spawns subprocesses > -# > - > -class HostManager(managers.SyncManager): > - ''' > - Manager type used for spawning processes on a (presumably) foreign host > - ''' > - def __init__(self, address, authkey): > - managers.SyncManager.__init__(self, address, authkey) > - self._name = 'Host-unknown' > - > - def Process(self, group=None, target=None, name=None, args=(), kwargs={}): > - if hasattr(sys.modules['__main__'], '__file__'): > - main_path = os.path.basename(sys.modules['__main__'].__file__) > - else: > - main_path = None > - data = pickle.dumps((target, args, kwargs)) > - p = self._RemoteProcess(data, main_path) > - if name is None: > - temp = self._name.split('Host-')[-1] + '/Process-%s' > - name = temp % ':'.join(map(str, p.get_identity())) > - p.set_name(name) > - return p > - > - @classmethod > - def from_address(cls, address, authkey): > - manager = cls(address, authkey) > - managers.transact(address, authkey, 'dummy') > - manager._state.value = managers.State.STARTED > - manager._name = 'Host-%s:%s' % manager.address > - manager.shutdown = util.Finalize( > - manager, HostManager._finalize_host, > - args=(manager._address, manager._authkey, manager._name), > - exitpriority=-10 > - ) > - return manager > - > - @staticmethod > - def _finalize_host(address, authkey, name): > - managers.transact(address, authkey, 'shutdown') > - > - def __repr__(self): > - return '<Host(%s)>' % self._name > - > -# > -# Process subclass representing a process on (possibly) a remote machine > -# > - > -class RemoteProcess(Process): > - ''' > - Represents a process started on a remote host > - ''' > - def __init__(self, data, main_path): > - assert not main_path or os.path.basename(main_path) == main_path > - Process.__init__(self) > - self._data = data > - self._main_path = main_path > - > - def _bootstrap(self): > - forking.prepare({'main_path': self._main_path}) > - self._target, self._args, self._kwargs = pickle.loads(self._data) > - return Process._bootstrap(self) > - > - def get_identity(self): > - return self._identity > - > -HostManager.register('_RemoteProcess', RemoteProcess) > - > -# > -# A Pool class that uses a cluster > -# > - > -class DistributedPool(pool.Pool): > - > - def __init__(self, cluster, processes=None, initializer=None, initargs=()): > - self._cluster = cluster > - self.Process = cluster.Process > - pool.Pool.__init__(self, processes or len(cluster), > - initializer, initargs) > - > - def _setup_queues(self): > - self._inqueue = self._cluster._SettableQueue() > - self._outqueue = self._cluster._SettableQueue() > - self._quick_put = self._inqueue.put > - self._quick_get = self._outqueue.get > - > - @staticmethod > - def _help_stuff_finish(inqueue, task_handler, size): > - inqueue.set_contents([None] * size) > - > -# > -# Manager type which starts host managers on other machines > -# > - > -def LocalProcess(**kwds): > - p = Process(**kwds) > - p.set_name('localhost/' + p.name) > - return p > - > -class Cluster(managers.SyncManager): > - ''' > - Represents collection of slots running on various hosts. > - > - `Cluster` is a subclass of `SyncManager` so it allows creation of > - various types of shared objects. > - ''' > - def __init__(self, hostlist, modules): > - managers.SyncManager.__init__(self, address=('localhost', 0)) > - self._hostlist = hostlist > - self._modules = modules > - if __name__ not in modules: > - modules.append(__name__) > - files = [sys.modules[name].__file__ for name in modules] > - for i, file in enumerate(files): > - if file.endswith('.pyc') or file.endswith('.pyo'): > - files[i] = file[:-4] + '.py' > - self._files = [os.path.abspath(file) for file in files] > - > - def start(self): > - managers.SyncManager.start(self) > - > - l = connection.Listener(family='AF_INET', authkey=self._authkey) > - > - for i, host in enumerate(self._hostlist): > - host._start_manager(i, self._authkey, l.address, self._files) > - > - for host in self._hostlist: > - if host.hostname != 'localhost': > - conn = l.accept() > - i, address, cpus = conn.recv() > - conn.close() > - other_host = self._hostlist[i] > - other_host.manager = HostManager.from_address(address, > - self._authkey) > - other_host.slots = other_host.slots or cpus > - other_host.Process = other_host.manager.Process > - else: > - host.slots = host.slots or slot_count > - host.Process = LocalProcess > - > - self._slotlist = [. > - Slot(host) for host in self._hostlist for i in range(host.slots) > - ] > - self._slot_iterator = itertools.cycle(self._slotlist) > - self._base_shutdown = self.shutdown > - del self.shutdown > - > - def shutdown(self): > - for host in self._hostlist: > - if host.hostname != 'localhost': > - host.manager.shutdown() > - self._base_shutdown() > - > - def Process(self, group=None, target=None, name=None, args=(), kwargs={}): > - slot = self._slot_iterator.next() > - return slot.Process( > - group=group, target=target, name=name, args=args, kwargs=kwargs > - ) > - > - def Pool(self, processes=None, initializer=None, initargs=()): > - return DistributedPool(self, processes, initializer, initargs) > - > - def __getitem__(self, i): > - return self._slotlist[i] > - > - def __len__(self): > - return len(self._slotlist) > - > - def __iter__(self): > - return iter(self._slotlist) > - > -# > -# Queue subclass used by distributed pool > -# > - > -class SettableQueue(Queue.Queue): > - def empty(self): > - return not self.queue > - def full(self): > - return self.maxsize > 0 and len(self.queue) == self.maxsize > - def set_contents(self, contents): > - # length of contents must be at least as large as the number of > - # threads which have potentially called get() > - self.not_empty.acquire() > - try: > - self.queue.clear() > - self.queue.extend(contents) > - self.not_empty.notifyAll() > - finally: > - self.not_empty.release() > - > -Cluster.register('_SettableQueue', SettableQueue) > - > -# > -# Class representing a notional cpu in the cluster > -# > - > -class Slot(object): > - def __init__(self, host): > - self.host = host > - self.Process = host.Process > - > -# > -# Host > -# > - > -class Host(object): > - ''' > - Represents a host to use as a node in a cluster. > - > - `hostname` gives the name of the host. If hostname is not > - "localhost" then ssh is used to log in to the host. To log in as > - a different user use a host name of the form > - "username [at] somewhere" > - > - `slots` is used to specify the number of slots for processes on > - the host. This affects how often processes will be allocated to > - this host. Normally this should be equal to the number of cpus on > - that host. > - ''' > - def __init__(self, hostname, slots=None): > - self.hostname = hostname > - self.slots = slots > - > - def _start_manager(self, index, authkey, address, files): > - if self.hostname != 'localhost': > - tempdir = copy_to_remote_temporary_directory(self.hostname, files) > - debug('startup files copied to %s:%s', self.hostname, tempdir) > - p = subprocess.Popen( > - [.'ssh', self.hostname, 'python', '-c', > - '"import os; os.chdir(%r); ' > - 'from distributing import main; main()"' % tempdir], > - stdin=subprocess.PIPE > - ) > - data = dict( > - name='BoostrappingHost', index=index, > - dist_log_level=_logger.getEffectiveLevel(), > - dir=tempdir, authkey=str(authkey), parent_address=address > - ) > - pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) > - p.stdin.close() > - > -# > -# Copy files to remote directory, returning name of directory > -# > - > -unzip_code = '''" > -import tempfile, os, sys, tarfile > -tempdir = tempfile.mkdtemp(prefix='distrib-') > -os.chdir(tempdir) > -tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') > -for ti in tf: > - tf.extract(ti) > -print tempdir > -"''' > - > -def copy_to_remote_temporary_directory(host, files): > - p = subprocess.Popen( > - ['ssh', host, 'python', '-c', unzip_code], > - stdout=subprocess.PIPE, stdin=subprocess.PIPE > - ) > - tf = tarfile.open(fileobj=p.stdin, mode='w|gz') > - for name in files: > - tf.add(name, os.path.basename(name)) > - tf.close() > - p.stdin.close() > - return p.stdout.read().rstrip() > - > -# > -# Code which runs a host manager > -# > - > -def main(): > - # get data from parent over stdin > - data = pickle.load(sys.stdin) > - sys.stdin.close() > - > - # set some stuff > - _logger.setLevel(data['dist_log_level']) > - forking.prepare(data) > - > - # create server for a `HostManager` object > - server = managers.Server(HostManager._registry, ('', 0), data['authkey']) > - current_process()._server = server > - > - # report server address and number of cpus back to parent > - conn = connection.Client(data['parent_address'], authkey=data['authkey']) > - conn.send((data['index'], server.address, slot_count)) > - conn.close() > - > - # set name etc > - current_process().set_name('Host-%s:%s' % server.address) > - util._run_after_forkers() > - > - # register a cleanup function > - def cleanup(directory): > - debug('removing directory %s', directory) > - shutil.rmtree(directory) > - debug('shutting down host manager') > - util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) > - > - # start host manager > - debug('remote host manager starting in %s', data['dir']) > - server.serve_forever() > > Modified: python/trunk/Doc/library/multiprocessing.rst > ============================================================================== > --- python/trunk/Doc/library/multiprocessing.rst (original) > +++ python/trunk/Doc/library/multiprocessing.rst Sat Nov 21 15:01:56 2009 > @@ -2230,10 +2230,3 @@ > > .. literalinclude:: ../includes/mp_benchmarks.py > > -An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process` > -and others to build a system which can distribute processes and work via a > -distributed queue to a "cluster" of machines on a network, accessible via SSH. > -You will need to have private key authentication for all hosts configured for > -this to work. > - > -.. literalinclude:: ../includes/mp_distributing.py > > Modified: python/trunk/Lib/multiprocessing/queues.py > ============================================================================== > --- python/trunk/Lib/multiprocessing/queues.py (original) > +++ python/trunk/Lib/multiprocessing/queues.py Sat Nov 21 15:01:56 2009 > @@ -47,6 +47,8 @@ > if sys.platform != 'win32': > register_after_fork(self, Queue._after_fork) > > + self.getv = 0 > + > def __getstate__(self): > assert_spawning(self) > return (self._maxsize, self._reader, self._writer, > @@ -71,6 +73,8 @@ > self._poll = self._reader.poll > > def put(self, obj, block=True, timeout=None): > + if not isinstance(obj, list): > + debug('put: %s', obj) > assert not self._closed > if not self._sem.acquire(block, timeout): > raise Full > @@ -85,11 +89,15 @@ > self._notempty.release() > > def get(self, block=True, timeout=None): > + self.getv += 1 > + debug('self.getv: %s', self.getv) > if block and timeout is None: > self._rlock.acquire() > try: > res = self._recv() > self._sem.release() > + if not isinstance(res, list): > + debug('get: %s', res) > return res > finally: > self._rlock.release() > @@ -104,6 +112,8 @@ > raise Empty > res = self._recv() > self._sem.release() > + if not isinstance(res, list): > + debug('get: %s', res) > return res > finally: > self._rlock.release() > @@ -229,16 +239,22 @@ > try: > while 1: > obj = bpopleft() > + if not isinstance(obj, list): > + debug('feeder thread got: %s', obj) > if obj is sentinel: > debug('feeder thread got sentinel -- exiting') > close() > return > - > if wacquire is None: > + if not isinstance(obj, list): > + debug('sending to pipe: %s', obj) > send(obj) > else: > - wacquire() > + debug('waiting on wacquire') > + wacquire(timeout=30) > try: > + if not isinstance(obj, list): > + debug('sending to pipe: %s', obj) > send(obj) > finally: > wrelease() > _______________________________________________ > Python-checkins mailing list > Python-checkins [at] python > http://mail.python.org/mailman/listinfo/python-checkins > _______________________________________________ Python-checkins mailing list Python-checkins [at] python http://mail.python.org/mailman/listinfo/python-checkins
|