Login | Register For Free | Help
Search for: (Advanced)

Mailing List Archive: Python: Checkins

r76433 - in python/trunk: Doc/includes/mp_distributing.py Doc/library/multiprocessing.rst Lib/multiprocessing/queues.py

 

 

Python checkins RSS feed   Index | Next | Previous | View Threaded


python-checkins at python

Nov 21, 2009, 6:20 AM

Post #1 of 2 (134 views)
Permalink
r76433 - in python/trunk: Doc/includes/mp_distributing.py Doc/library/multiprocessing.rst Lib/multiprocessing/queues.py

Author: jesse.noller
Date: Sat Nov 21 15:01:56 2009
New Revision: 76433

Log:
issue5738: The distribution example was confusing, and out of date. It's too large to include inline in the docs as well. It belongs in an addons module outside the stdlib. Removing.

Removed:
python/trunk/Doc/includes/mp_distributing.py
Modified:
python/trunk/Doc/library/multiprocessing.rst
python/trunk/Lib/multiprocessing/queues.py

Deleted: python/trunk/Doc/includes/mp_distributing.py
==============================================================================
--- python/trunk/Doc/includes/mp_distributing.py Sat Nov 21 15:01:56 2009
+++ (empty file)
@@ -1,364 +0,0 @@
-#
-# Module to allow spawning of processes on foreign host
-#
-# Depends on `multiprocessing` package -- tested with `processing-0.60`
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
-
-#
-# Imports
-#
-
-import sys
-import os
-import tarfile
-import shutil
-import subprocess
-import logging
-import itertools
-import Queue
-
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
-
-from multiprocessing import Process, current_process, cpu_count
-from multiprocessing import util, managers, connection, forking, pool
-
-#
-# Logging
-#
-
-def get_logger():
- return _logger
-
-_logger = logging.getLogger('distributing')
-_logger.propagate = 0
-
-_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
-_handler = logging.StreamHandler()
-_handler.setFormatter(_formatter)
-_logger.addHandler(_handler)
-
-info = _logger.info
-debug = _logger.debug
-
-#
-# Get number of cpus
-#
-
-try:
- slot_count = cpu_count()
-except NotImplemented:
- slot_count = 1
-
-#
-# Manager type which spawns subprocesses
-#
-
-class HostManager(managers.SyncManager):
- '''
- Manager type used for spawning processes on a (presumably) foreign host
- '''
- def __init__(self, address, authkey):
- managers.SyncManager.__init__(self, address, authkey)
- self._name = 'Host-unknown'
-
- def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
- if hasattr(sys.modules['__main__'], '__file__'):
- main_path = os.path.basename(sys.modules['__main__'].__file__)
- else:
- main_path = None
- data = pickle.dumps((target, args, kwargs))
- p = self._RemoteProcess(data, main_path)
- if name is None:
- temp = self._name.split('Host-')[-1] + '/Process-%s'
- name = temp % ':'.join(map(str, p.get_identity()))
- p.set_name(name)
- return p
-
- @classmethod
- def from_address(cls, address, authkey):
- manager = cls(address, authkey)
- managers.transact(address, authkey, 'dummy')
- manager._state.value = managers.State.STARTED
- manager._name = 'Host-%s:%s' % manager.address
- manager.shutdown = util.Finalize(
- manager, HostManager._finalize_host,
- args=(manager._address, manager._authkey, manager._name),
- exitpriority=-10
- )
- return manager
-
- @staticmethod
- def _finalize_host(address, authkey, name):
- managers.transact(address, authkey, 'shutdown')
-
- def __repr__(self):
- return '<Host(%s)>' % self._name
-
-#
-# Process subclass representing a process on (possibly) a remote machine
-#
-
-class RemoteProcess(Process):
- '''
- Represents a process started on a remote host
- '''
- def __init__(self, data, main_path):
- assert not main_path or os.path.basename(main_path) == main_path
- Process.__init__(self)
- self._data = data
- self._main_path = main_path
-
- def _bootstrap(self):
- forking.prepare({'main_path': self._main_path})
- self._target, self._args, self._kwargs = pickle.loads(self._data)
- return Process._bootstrap(self)
-
- def get_identity(self):
- return self._identity
-
-HostManager.register('_RemoteProcess', RemoteProcess)
-
-#
-# A Pool class that uses a cluster
-#
-
-class DistributedPool(pool.Pool):
-
- def __init__(self, cluster, processes=None, initializer=None, initargs=()):
- self._cluster = cluster
- self.Process = cluster.Process
- pool.Pool.__init__(self, processes or len(cluster),
- initializer, initargs)
-
- def _setup_queues(self):
- self._inqueue = self._cluster._SettableQueue()
- self._outqueue = self._cluster._SettableQueue()
- self._quick_put = self._inqueue.put
- self._quick_get = self._outqueue.get
-
- @staticmethod
- def _help_stuff_finish(inqueue, task_handler, size):
- inqueue.set_contents([None] * size)
-
-#
-# Manager type which starts host managers on other machines
-#
-
-def LocalProcess(**kwds):
- p = Process(**kwds)
- p.set_name('localhost/' + p.name)
- return p
-
-class Cluster(managers.SyncManager):
- '''
- Represents collection of slots running on various hosts.
-
- `Cluster` is a subclass of `SyncManager` so it allows creation of
- various types of shared objects.
- '''
- def __init__(self, hostlist, modules):
- managers.SyncManager.__init__(self, address=('localhost', 0))
- self._hostlist = hostlist
- self._modules = modules
- if __name__ not in modules:
- modules.append(__name__)
- files = [sys.modules[name].__file__ for name in modules]
- for i, file in enumerate(files):
- if file.endswith('.pyc') or file.endswith('.pyo'):
- files[i] = file[:-4] + '.py'
- self._files = [os.path.abspath(file) for file in files]
-
- def start(self):
- managers.SyncManager.start(self)
-
- l = connection.Listener(family='AF_INET', authkey=self._authkey)
-
- for i, host in enumerate(self._hostlist):
- host._start_manager(i, self._authkey, l.address, self._files)
-
- for host in self._hostlist:
- if host.hostname != 'localhost':
- conn = l.accept()
- i, address, cpus = conn.recv()
- conn.close()
- other_host = self._hostlist[i]
- other_host.manager = HostManager.from_address(address,
- self._authkey)
- other_host.slots = other_host.slots or cpus
- other_host.Process = other_host.manager.Process
- else:
- host.slots = host.slots or slot_count
- host.Process = LocalProcess
-
- self._slotlist = [.
- Slot(host) for host in self._hostlist for i in range(host.slots)
- ]
- self._slot_iterator = itertools.cycle(self._slotlist)
- self._base_shutdown = self.shutdown
- del self.shutdown
-
- def shutdown(self):
- for host in self._hostlist:
- if host.hostname != 'localhost':
- host.manager.shutdown()
- self._base_shutdown()
-
- def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
- slot = self._slot_iterator.next()
- return slot.Process(
- group=group, target=target, name=name, args=args, kwargs=kwargs
- )
-
- def Pool(self, processes=None, initializer=None, initargs=()):
- return DistributedPool(self, processes, initializer, initargs)
-
- def __getitem__(self, i):
- return self._slotlist[i]
-
- def __len__(self):
- return len(self._slotlist)
-
- def __iter__(self):
- return iter(self._slotlist)
-
-#
-# Queue subclass used by distributed pool
-#
-
-class SettableQueue(Queue.Queue):
- def empty(self):
- return not self.queue
- def full(self):
- return self.maxsize > 0 and len(self.queue) == self.maxsize
- def set_contents(self, contents):
- # length of contents must be at least as large as the number of
- # threads which have potentially called get()
- self.not_empty.acquire()
- try:
- self.queue.clear()
- self.queue.extend(contents)
- self.not_empty.notifyAll()
- finally:
- self.not_empty.release()
-
-Cluster.register('_SettableQueue', SettableQueue)
-
-#
-# Class representing a notional cpu in the cluster
-#
-
-class Slot(object):
- def __init__(self, host):
- self.host = host
- self.Process = host.Process
-
-#
-# Host
-#
-
-class Host(object):
- '''
- Represents a host to use as a node in a cluster.
-
- `hostname` gives the name of the host. If hostname is not
- "localhost" then ssh is used to log in to the host. To log in as
- a different user use a host name of the form
- "username [at] somewhere"
-
- `slots` is used to specify the number of slots for processes on
- the host. This affects how often processes will be allocated to
- this host. Normally this should be equal to the number of cpus on
- that host.
- '''
- def __init__(self, hostname, slots=None):
- self.hostname = hostname
- self.slots = slots
-
- def _start_manager(self, index, authkey, address, files):
- if self.hostname != 'localhost':
- tempdir = copy_to_remote_temporary_directory(self.hostname, files)
- debug('startup files copied to %s:%s', self.hostname, tempdir)
- p = subprocess.Popen(
- ['ssh', self.hostname, 'python', '-c',
- '"import os; os.chdir(%r); '
- 'from distributing import main; main()"' % tempdir],
- stdin=subprocess.PIPE
- )
- data = dict(
- name='BoostrappingHost', index=index,
- dist_log_level=_logger.getEffectiveLevel(),
- dir=tempdir, authkey=str(authkey), parent_address=address
- )
- pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
- p.stdin.close()
-
-#
-# Copy files to remote directory, returning name of directory
-#
-
-unzip_code = '''"
-import tempfile, os, sys, tarfile
-tempdir = tempfile.mkdtemp(prefix='distrib-')
-os.chdir(tempdir)
-tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
-for ti in tf:
- tf.extract(ti)
-print tempdir
-"'''
-
-def copy_to_remote_temporary_directory(host, files):
- p = subprocess.Popen(
- ['ssh', host, 'python', '-c', unzip_code],
- stdout=subprocess.PIPE, stdin=subprocess.PIPE
- )
- tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
- for name in files:
- tf.add(name, os.path.basename(name))
- tf.close()
- p.stdin.close()
- return p.stdout.read().rstrip()
-
-#
-# Code which runs a host manager
-#
-
-def main():
- # get data from parent over stdin
- data = pickle.load(sys.stdin)
- sys.stdin.close()
-
- # set some stuff
- _logger.setLevel(data['dist_log_level'])
- forking.prepare(data)
-
- # create server for a `HostManager` object
- server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
- current_process()._server = server
-
- # report server address and number of cpus back to parent
- conn = connection.Client(data['parent_address'], authkey=data['authkey'])
- conn.send((data['index'], server.address, slot_count))
- conn.close()
-
- # set name etc
- current_process().set_name('Host-%s:%s' % server.address)
- util._run_after_forkers()
-
- # register a cleanup function
- def cleanup(directory):
- debug('removing directory %s', directory)
- shutil.rmtree(directory)
- debug('shutting down host manager')
- util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
-
- # start host manager
- debug('remote host manager starting in %s', data['dir'])
- server.serve_forever()

Modified: python/trunk/Doc/library/multiprocessing.rst
==============================================================================
--- python/trunk/Doc/library/multiprocessing.rst (original)
+++ python/trunk/Doc/library/multiprocessing.rst Sat Nov 21 15:01:56 2009
@@ -2230,10 +2230,3 @@

.. literalinclude:: ../includes/mp_benchmarks.py

-An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process`
-and others to build a system which can distribute processes and work via a
-distributed queue to a "cluster" of machines on a network, accessible via SSH.
-You will need to have private key authentication for all hosts configured for
-this to work.
-
-.. literalinclude:: ../includes/mp_distributing.py

Modified: python/trunk/Lib/multiprocessing/queues.py
==============================================================================
--- python/trunk/Lib/multiprocessing/queues.py (original)
+++ python/trunk/Lib/multiprocessing/queues.py Sat Nov 21 15:01:56 2009
@@ -47,6 +47,8 @@
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

+ self.getv = 0
+
def __getstate__(self):
assert_spawning(self)
return (self._maxsize, self._reader, self._writer,
@@ -71,6 +73,8 @@
self._poll = self._reader.poll

def put(self, obj, block=True, timeout=None):
+ if not isinstance(obj, list):
+ debug('put: %s', obj)
assert not self._closed
if not self._sem.acquire(block, timeout):
raise Full
@@ -85,11 +89,15 @@
self._notempty.release()

def get(self, block=True, timeout=None):
+ self.getv += 1
+ debug('self.getv: %s', self.getv)
if block and timeout is None:
self._rlock.acquire()
try:
res = self._recv()
self._sem.release()
+ if not isinstance(res, list):
+ debug('get: %s', res)
return res
finally:
self._rlock.release()
@@ -104,6 +112,8 @@
raise Empty
res = self._recv()
self._sem.release()
+ if not isinstance(res, list):
+ debug('get: %s', res)
return res
finally:
self._rlock.release()
@@ -229,16 +239,22 @@
try:
while 1:
obj = bpopleft()
+ if not isinstance(obj, list):
+ debug('feeder thread got: %s', obj)
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
close()
return
-
if wacquire is None:
+ if not isinstance(obj, list):
+ debug('sending to pipe: %s', obj)
send(obj)
else:
- wacquire()
+ debug('waiting on wacquire')
+ wacquire(timeout=30)
try:
+ if not isinstance(obj, list):
+ debug('sending to pipe: %s', obj)
send(obj)
finally:
wrelease()
_______________________________________________
Python-checkins mailing list
Python-checkins [at] python
http://mail.python.org/mailman/listinfo/python-checkins


jnoller at gmail

Nov 21, 2009, 6:25 AM

Post #2 of 2 (125 views)
Permalink
Re: r76433 - in python/trunk: Doc/includes/mp_distributing.py Doc/library/multiprocessing.rst Lib/multiprocessing/queues.py [In reply to]

Yes, I brain farted and pulled back in some debugging work I was in
the middle of in the checkin, I reverted the queues.py change on trunk
and removed it from the merges that followed.

On Sat, Nov 21, 2009 at 9:02 AM, jesse.noller
<python-checkins [at] python> wrote:
> Author: jesse.noller
> Date: Sat Nov 21 15:01:56 2009
> New Revision: 76433
>
> Log:
> issue5738: The distribution example was confusing, and out of date. It's too large to include inline in the docs as well. It belongs in an addons module outside the stdlib. Removing.
>
> Removed:
>   python/trunk/Doc/includes/mp_distributing.py
> Modified:
>   python/trunk/Doc/library/multiprocessing.rst
>   python/trunk/Lib/multiprocessing/queues.py
>
> Deleted: python/trunk/Doc/includes/mp_distributing.py
> ==============================================================================
> --- python/trunk/Doc/includes/mp_distributing.py        Sat Nov 21 15:01:56 2009
> +++ (empty file)
> @@ -1,364 +0,0 @@
> -#
> -# Module to allow spawning of processes on foreign host
> -#
> -# Depends on `multiprocessing` package -- tested with `processing-0.60`
> -#
> -# Copyright (c) 2006-2008, R Oudkerk
> -# All rights reserved.
> -#
> -
> -__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
> -
> -#
> -# Imports
> -#
> -
> -import sys
> -import os
> -import tarfile
> -import shutil
> -import subprocess
> -import logging
> -import itertools
> -import Queue
> -
> -try:
> -    import cPickle as pickle
> -except ImportError:
> -    import pickle
> -
> -from multiprocessing import Process, current_process, cpu_count
> -from multiprocessing import util, managers, connection, forking, pool
> -
> -#
> -# Logging
> -#
> -
> -def get_logger():
> -    return _logger
> -
> -_logger = logging.getLogger('distributing')
> -_logger.propagate = 0
> -
> -_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
> -_handler = logging.StreamHandler()
> -_handler.setFormatter(_formatter)
> -_logger.addHandler(_handler)
> -
> -info = _logger.info
> -debug = _logger.debug
> -
> -#
> -# Get number of cpus
> -#
> -
> -try:
> -    slot_count = cpu_count()
> -except NotImplemented:
> -    slot_count = 1
> -
> -#
> -# Manager type which spawns subprocesses
> -#
> -
> -class HostManager(managers.SyncManager):
> -    '''
> -    Manager type used for spawning processes on a (presumably) foreign host
> -    '''
> -    def __init__(self, address, authkey):
> -        managers.SyncManager.__init__(self, address, authkey)
> -        self._name = 'Host-unknown'
> -
> -    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
> -        if hasattr(sys.modules['__main__'], '__file__'):
> -            main_path = os.path.basename(sys.modules['__main__'].__file__)
> -        else:
> -            main_path = None
> -        data = pickle.dumps((target, args, kwargs))
> -        p = self._RemoteProcess(data, main_path)
> -        if name is None:
> -            temp = self._name.split('Host-')[-1] + '/Process-%s'
> -            name = temp % ':'.join(map(str, p.get_identity()))
> -        p.set_name(name)
> -        return p
> -
> -    @classmethod
> -    def from_address(cls, address, authkey):
> -        manager = cls(address, authkey)
> -        managers.transact(address, authkey, 'dummy')
> -        manager._state.value = managers.State.STARTED
> -        manager._name = 'Host-%s:%s' % manager.address
> -        manager.shutdown = util.Finalize(
> -            manager, HostManager._finalize_host,
> -            args=(manager._address, manager._authkey, manager._name),
> -            exitpriority=-10
> -            )
> -        return manager
> -
> -    @staticmethod
> -    def _finalize_host(address, authkey, name):
> -        managers.transact(address, authkey, 'shutdown')
> -
> -    def __repr__(self):
> -        return '<Host(%s)>' % self._name
> -
> -#
> -# Process subclass representing a process on (possibly) a remote machine
> -#
> -
> -class RemoteProcess(Process):
> -    '''
> -    Represents a process started on a remote host
> -    '''
> -    def __init__(self, data, main_path):
> -        assert not main_path or os.path.basename(main_path) == main_path
> -        Process.__init__(self)
> -        self._data = data
> -        self._main_path = main_path
> -
> -    def _bootstrap(self):
> -        forking.prepare({'main_path': self._main_path})
> -        self._target, self._args, self._kwargs = pickle.loads(self._data)
> -        return Process._bootstrap(self)
> -
> -    def get_identity(self):
> -        return self._identity
> -
> -HostManager.register('_RemoteProcess', RemoteProcess)
> -
> -#
> -# A Pool class that uses a cluster
> -#
> -
> -class DistributedPool(pool.Pool):
> -
> -    def __init__(self, cluster, processes=None, initializer=None, initargs=()):
> -        self._cluster = cluster
> -        self.Process = cluster.Process
> -        pool.Pool.__init__(self, processes or len(cluster),
> -                           initializer, initargs)
> -
> -    def _setup_queues(self):
> -        self._inqueue = self._cluster._SettableQueue()
> -        self._outqueue = self._cluster._SettableQueue()
> -        self._quick_put = self._inqueue.put
> -        self._quick_get = self._outqueue.get
> -
> -    @staticmethod
> -    def _help_stuff_finish(inqueue, task_handler, size):
> -        inqueue.set_contents([None] * size)
> -
> -#
> -# Manager type which starts host managers on other machines
> -#
> -
> -def LocalProcess(**kwds):
> -    p = Process(**kwds)
> -    p.set_name('localhost/' + p.name)
> -    return p
> -
> -class Cluster(managers.SyncManager):
> -    '''
> -    Represents collection of slots running on various hosts.
> -
> -    `Cluster` is a subclass of `SyncManager` so it allows creation of
> -    various types of shared objects.
> -    '''
> -    def __init__(self, hostlist, modules):
> -        managers.SyncManager.__init__(self, address=('localhost', 0))
> -        self._hostlist = hostlist
> -        self._modules = modules
> -        if __name__ not in modules:
> -            modules.append(__name__)
> -        files = [sys.modules[name].__file__ for name in modules]
> -        for i, file in enumerate(files):
> -            if file.endswith('.pyc') or file.endswith('.pyo'):
> -                files[i] = file[:-4] + '.py'
> -        self._files = [os.path.abspath(file) for file in files]
> -
> -    def start(self):
> -        managers.SyncManager.start(self)
> -
> -        l = connection.Listener(family='AF_INET', authkey=self._authkey)
> -
> -        for i, host in enumerate(self._hostlist):
> -            host._start_manager(i, self._authkey, l.address, self._files)
> -
> -        for host in self._hostlist:
> -            if host.hostname != 'localhost':
> -                conn = l.accept()
> -                i, address, cpus = conn.recv()
> -                conn.close()
> -                other_host = self._hostlist[i]
> -                other_host.manager = HostManager.from_address(address,
> -                                                              self._authkey)
> -                other_host.slots = other_host.slots or cpus
> -                other_host.Process = other_host.manager.Process
> -            else:
> -                host.slots = host.slots or slot_count
> -                host.Process = LocalProcess
> -
> -        self._slotlist = [.
> -            Slot(host) for host in self._hostlist for i in range(host.slots)
> -            ]
> -        self._slot_iterator = itertools.cycle(self._slotlist)
> -        self._base_shutdown = self.shutdown
> -        del self.shutdown
> -
> -    def shutdown(self):
> -        for host in self._hostlist:
> -            if host.hostname != 'localhost':
> -                host.manager.shutdown()
> -        self._base_shutdown()
> -
> -    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
> -        slot = self._slot_iterator.next()
> -        return slot.Process(
> -            group=group, target=target, name=name, args=args, kwargs=kwargs
> -            )
> -
> -    def Pool(self, processes=None, initializer=None, initargs=()):
> -        return DistributedPool(self, processes, initializer, initargs)
> -
> -    def __getitem__(self, i):
> -        return self._slotlist[i]
> -
> -    def __len__(self):
> -        return len(self._slotlist)
> -
> -    def __iter__(self):
> -        return iter(self._slotlist)
> -
> -#
> -# Queue subclass used by distributed pool
> -#
> -
> -class SettableQueue(Queue.Queue):
> -    def empty(self):
> -        return not self.queue
> -    def full(self):
> -        return self.maxsize > 0 and len(self.queue) == self.maxsize
> -    def set_contents(self, contents):
> -        # length of contents must be at least as large as the number of
> -        # threads which have potentially called get()
> -        self.not_empty.acquire()
> -        try:
> -            self.queue.clear()
> -            self.queue.extend(contents)
> -            self.not_empty.notifyAll()
> -        finally:
> -            self.not_empty.release()
> -
> -Cluster.register('_SettableQueue', SettableQueue)
> -
> -#
> -# Class representing a notional cpu in the cluster
> -#
> -
> -class Slot(object):
> -    def __init__(self, host):
> -        self.host = host
> -        self.Process = host.Process
> -
> -#
> -# Host
> -#
> -
> -class Host(object):
> -    '''
> -    Represents a host to use as a node in a cluster.
> -
> -    `hostname` gives the name of the host.  If hostname is not
> -    "localhost" then ssh is used to log in to the host.  To log in as
> -    a different user use a host name of the form
> -    "username [at] somewhere"
> -
> -    `slots` is used to specify the number of slots for processes on
> -    the host.  This affects how often processes will be allocated to
> -    this host.  Normally this should be equal to the number of cpus on
> -    that host.
> -    '''
> -    def __init__(self, hostname, slots=None):
> -        self.hostname = hostname
> -        self.slots = slots
> -
> -    def _start_manager(self, index, authkey, address, files):
> -        if self.hostname != 'localhost':
> -            tempdir = copy_to_remote_temporary_directory(self.hostname, files)
> -            debug('startup files copied to %s:%s', self.hostname, tempdir)
> -            p = subprocess.Popen(
> -                [.'ssh', self.hostname, 'python', '-c',
> -                 '"import os; os.chdir(%r); '
> -                 'from distributing import main; main()"' % tempdir],
> -                stdin=subprocess.PIPE
> -                )
> -            data = dict(
> -                name='BoostrappingHost', index=index,
> -                dist_log_level=_logger.getEffectiveLevel(),
> -                dir=tempdir, authkey=str(authkey), parent_address=address
> -                )
> -            pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
> -            p.stdin.close()
> -
> -#
> -# Copy files to remote directory, returning name of directory
> -#
> -
> -unzip_code = '''"
> -import tempfile, os, sys, tarfile
> -tempdir = tempfile.mkdtemp(prefix='distrib-')
> -os.chdir(tempdir)
> -tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
> -for ti in tf:
> -    tf.extract(ti)
> -print tempdir
> -"'''
> -
> -def copy_to_remote_temporary_directory(host, files):
> -    p = subprocess.Popen(
> -        ['ssh', host, 'python', '-c', unzip_code],
> -        stdout=subprocess.PIPE, stdin=subprocess.PIPE
> -        )
> -    tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
> -    for name in files:
> -        tf.add(name, os.path.basename(name))
> -    tf.close()
> -    p.stdin.close()
> -    return p.stdout.read().rstrip()
> -
> -#
> -# Code which runs a host manager
> -#
> -
> -def main():
> -    # get data from parent over stdin
> -    data = pickle.load(sys.stdin)
> -    sys.stdin.close()
> -
> -    # set some stuff
> -    _logger.setLevel(data['dist_log_level'])
> -    forking.prepare(data)
> -
> -    # create server for a `HostManager` object
> -    server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
> -    current_process()._server = server
> -
> -    # report server address and number of cpus back to parent
> -    conn = connection.Client(data['parent_address'], authkey=data['authkey'])
> -    conn.send((data['index'], server.address, slot_count))
> -    conn.close()
> -
> -    # set name etc
> -    current_process().set_name('Host-%s:%s' % server.address)
> -    util._run_after_forkers()
> -
> -    # register a cleanup function
> -    def cleanup(directory):
> -        debug('removing directory %s', directory)
> -        shutil.rmtree(directory)
> -        debug('shutting down host manager')
> -    util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
> -
> -    # start host manager
> -    debug('remote host manager starting in %s', data['dir'])
> -    server.serve_forever()
>
> Modified: python/trunk/Doc/library/multiprocessing.rst
> ==============================================================================
> --- python/trunk/Doc/library/multiprocessing.rst        (original)
> +++ python/trunk/Doc/library/multiprocessing.rst        Sat Nov 21 15:01:56 2009
> @@ -2230,10 +2230,3 @@
>
>  .. literalinclude:: ../includes/mp_benchmarks.py
>
> -An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process`
> -and others to build a system which can distribute processes and work via a
> -distributed queue to a "cluster" of machines on a network, accessible via SSH.
> -You will need to have private key authentication for all hosts configured for
> -this to work.
> -
> -.. literalinclude:: ../includes/mp_distributing.py
>
> Modified: python/trunk/Lib/multiprocessing/queues.py
> ==============================================================================
> --- python/trunk/Lib/multiprocessing/queues.py  (original)
> +++ python/trunk/Lib/multiprocessing/queues.py  Sat Nov 21 15:01:56 2009
> @@ -47,6 +47,8 @@
>         if sys.platform != 'win32':
>             register_after_fork(self, Queue._after_fork)
>
> +        self.getv = 0
> +
>     def __getstate__(self):
>         assert_spawning(self)
>         return (self._maxsize, self._reader, self._writer,
> @@ -71,6 +73,8 @@
>         self._poll = self._reader.poll
>
>     def put(self, obj, block=True, timeout=None):
> +        if not isinstance(obj, list):
> +            debug('put: %s', obj)
>         assert not self._closed
>         if not self._sem.acquire(block, timeout):
>             raise Full
> @@ -85,11 +89,15 @@
>             self._notempty.release()
>
>     def get(self, block=True, timeout=None):
> +        self.getv += 1
> +        debug('self.getv: %s', self.getv)
>         if block and timeout is None:
>             self._rlock.acquire()
>             try:
>                 res = self._recv()
>                 self._sem.release()
> +                if not isinstance(res, list):
> +                    debug('get: %s', res)
>                 return res
>             finally:
>                 self._rlock.release()
> @@ -104,6 +112,8 @@
>                     raise Empty
>                 res = self._recv()
>                 self._sem.release()
> +                if not isinstance(res, list):
> +                    debug('get: %s', res)
>                 return res
>             finally:
>                 self._rlock.release()
> @@ -229,16 +239,22 @@
>                 try:
>                     while 1:
>                         obj = bpopleft()
> +                        if not isinstance(obj, list):
> +                            debug('feeder thread got: %s', obj)
>                         if obj is sentinel:
>                             debug('feeder thread got sentinel -- exiting')
>                             close()
>                             return
> -
>                         if wacquire is None:
> +                            if not isinstance(obj, list):
> +                                debug('sending to pipe: %s', obj)
>                             send(obj)
>                         else:
> -                            wacquire()
> +                            debug('waiting on wacquire')
> +                            wacquire(timeout=30)
>                             try:
> +                                if not isinstance(obj, list):
> +                                    debug('sending to pipe: %s', obj)
>                                 send(obj)
>                             finally:
>                                 wrelease()
> _______________________________________________
> Python-checkins mailing list
> Python-checkins [at] python
> http://mail.python.org/mailman/listinfo/python-checkins
>
_______________________________________________
Python-checkins mailing list
Python-checkins [at] python
http://mail.python.org/mailman/listinfo/python-checkins

Python checkins RSS feed   Index | Next | Previous | View Threaded
 
 


Interested in having your list archived? Contact Gossamer Threads
 
  Web Applications & Managed Hosting Powered by Gossamer Threads Inc.