dragon.infrastructure.policy.Policy

class Policy

Bases: object

A Policy is used to define the placement or affinity of an object or process within the Dragon allocation to specific physical resources like nodes, CPUs, or GPUs. The use and evaluation of a Policy must be consistent across the multiple ways of starting a process (Process, multiprocessing.Process , ProcessGroup, etc).

Policy Hierarchy

There are multiple ways and places that you can set a Policy. Dragon has defined the following hierarchy and will attempt to merge multiple policies in priority order (with 1 being the highest priority and higher numbers being a lower priority) to create a single combined policy for any given object:

  1. An explicit policy passed to the Process, ProcessGroup, or Global Service API

  2. If creating a ProcessGroup, the process group’s policy will be merged with the policy (if any) of each ProcessTemplate

  3. Use of the Python based Policy context manager.

  4. The Global Policy

The Global Policy

The global_policy object defines the default policy values to use when not otherwise set. The global_policy object will always be merged with any user supplied Policy before evaluating the policy to ensure that the resultant Policy object is complete and valid.

GS_DEFAULT_POLICY = Policy(
    placement=Policy.Placement.ANYWHERE,
    host_name="",
    host_id=-1,
    distribution=Policy.Distribution.ROUNDROBIN,
    cpu_affinity=[],
    gpu_env_str="",
    gpu_affinity=[],
    wait_mode=Policy.WaitMode.IDLE,
    refcounted=True,
)

There is no way currently to modify or change the global_policy object.

Examples

No Policy

In the case that no policy is passed, the object being created will use the default global_policy.

from dragon.native import Process

process = Process(
    target=cmdline,
)

Single Explicit policy

Any user supplied policy passed to the object’s constructor, or Global Service API, will first be merged with the global_policy object. The resulting merged Policy is what will be used by the object being created.

from dragon.infrastructure.policy import Policy
from dragon.native import Process

policy = Policy(Placement=Policy.Placement.LOCAL)
process = Process(
    target=cmdline,
    policy=policy,
)

The resulting Policy for the above example will be

Policy(
       placement=Policy.Placement.LOCAL,              # from policy passed to object constructor.
       distribution=Policy.Distribution.ROUNDROBIN,   # from global_policy.
       wait_mode=Policy.WaitMode.IDLE,                # from global_policy. Not currently used.
       refcounted=True,                               # from global_policy. Not currently used.
       ...
       )

Using Policy Context Manager

The Policy context manager that can be used to establish a thread- local stack of Policy objects. Any Process or ProcessGroup object that is created within this context will inherit the Policy defined by the nested stack of Policy objects.

from dragon.infrastructure.policy import Policy

with Policy(placement=Policy.Placement.HOST_NAME, host_name=socket.gethostname()):
    proc = mp.Process(target=cmdline)
    proc.start()

In the above case, the Policy context manager is used to help place a Python multiprocessing.Process object, which otherwise does not accept a policy parameter, within the Dragon allocation on a specific host.

from dragon.infrastructure.policy import Policy
from dragon.native import Process

with Policy(placement=Policy.Placement.HOST_NAME, host_name=socket.gethostname()):
    policy = Policy(gpu_affinity=GPU_AFFINITY)
    process = Process(target=cmdline, policy=policy)

In this example, the policy object passed to the Process object constructor and the policy created via the Policy context manager will be merged. The resultant policy will be:

Policy(
       placement = Policy.Placement.HOST_NAME,        # from Policy Context Manager.
       hostname = socket.gethostname(),               # from Policy Context Manager.
       distribution=Policy.Distribution.ROUNDROBIN,   # from global_policy.
       gpu_affinity = GPU_AFFINITY                    # from explicit policy passed to the Process object.
       wait_mode=Policy.WaitMode.IDLE,                # from global_policy. Not currently used.
       refcounted=True,                               # from global_policy. Not currently used.
       )

In a case where there are multiple, nested Policy context managers, the policy of each context manager will be merged together (from inner-most to outer-most) before being merged with any policy passed to the object being created.

from dragon.infrastructure.policy import Policy
from dragon.native import Process

with Policy(placement=Policy.Placement.ANYWHERE, distribution=Policy.Distribution.BLOCK):
    with Policy(placement=Policy.Placement.HOST_NAME, host_name=socket.gethostname()):
        policy = Policy(gpu_affinity=GPU_AFFINITY)
        process = Process(target=cmdline, policy=policy)

In this case, the resultant policy will be:

Policy(
       distribution=Policy.Distribution.BLOCK,  # from outer-most context manager.
       placement=Policy.Placement.HOST_NAME,    # from inner-most context manager.
       host_name=socket.gethostname(),          # from inner-most context manager.
       gpu_affinity=GPU_AFFINITY,               # from explicit policy passed to the constructor.
       wait_mode=Policy.WaitMode.IDLE,          # from the global_policy. Not currently used.
      )

Starting a ProcessGroup

When creating a process group, a policy can be added on both the process group and process templates. The dragon Default policy will first be merged with the process group policy. The resultant policy will then be merged with each process template’s policy.

from dragon.native.process_group import ProcessGroup
from dragon.native.process import MSG_PIPE, MSG_DEVNULL, Process, ProcessTemplate

group_policy = Policy(distribution=Policy.Distribution.BLOCK)
grp = ProcessGroup(restart=False, pmi_enabled=True, polic=group_policy)

template_policy_1 = Policy(gpu_affinity=GPU_AFFINITY_1)

# Pipe the stdout output from the head process to a Dragon connection
grp.add_process(
                nproc=1,
                template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_PIPE,
                policy=template_policy_1)
               )

template_policy_2 = Policy(gpu_affinity=GPU_AFFINITY_2)

# All other ranks should have their output go to DEVNULL
grp.add_process(
                nproc=num_ranks-1,
                template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_DEVNULL,
                policy=template_policy_2)
                )

The first process template will have a policy matching

Policy(
       placement = Policy.Placement.ANYWHERE,         # from global_policy
       distribution=Policy.Distribution.BLOCK,        # from Group policy
       gpu_affinity = GPU_AFFINITY_1                  # from teplate_policy_1
       wait_mode=Policy.WaitMode.IDLE,                # from global_policy. Not currently used.
       refcounted=True,                               # from global_policy. Not currently used.
      )

The rest of the process templates will have a policy matching

Policy(
       placement = Policy.Placement.ANYWHERE,         # from global_policy
       distribution=Policy.Distribution.BLOCK,        # from Group policy
       gpu_affinity = GPU_AFFINITY_2                  # from teplate_policy_2
       wait_mode=Policy.WaitMode.IDLE,                # from global_policy. Not currently used.
       refcounted=True,                               # from global_policy. Not currently used.
)
__init__(placement: ~dragon.infrastructure.policy.Policy.Placement = Placement.DEFAULT, host_name: str = '', host_id: int = -1, distribution: ~dragon.infrastructure.policy.Policy.Distribution = Distribution.DEFAULT, cpu_affinity: list[int] = <factory>, gpu_env_str: str = '', gpu_affinity: list[int] = <factory>, wait_mode: ~dragon.infrastructure.policy.Policy.WaitMode = WaitMode.DEFAULT, refcounted: bool = True) None

Methods

__init__([placement, host_name, host_id, ...])

from_sdict(sdict)

get_sdict()

global_policy()

merge(low_policy, high_policy)

Merge two policies, using values from high_policy for values not assigned on init Returns a new policy

thread_policy()

Attributes

distribution

gpu_env_str

host_id

host_name

placement

refcounted

sdesc

wait_mode

cpu_affinity

gpu_affinity

class Placement

Bases: IntEnum

Which node to assign a policy to.

Local and Anywhere will be useful later for multi-system communication Right now Placement will have little effect unless HOST_NAME or HOST_ID are used, which will try to place a policy on the specified node

LOCAL - Local to current system of nodes ANYWHERE - Place anywhere HOST_NAME - Place on node with specific name HOST_ID - Place on node with specific ID DEFAULT - Defaults to ANYWHERE

LOCAL = -5
ANYWHERE = -4
HOST_NAME = -3
HOST_ID = -2
DEFAULT = -1
conjugate()

Returns self, the complex conjugate of any int.

bit_length()

Number of bits necessary to represent self in binary.

>>> bin(37)
'0b100101'
>>> (37).bit_length()
6
bit_count()

Number of ones in the binary representation of the absolute value of self.

Also known as the population count.

>>> bin(13)
'0b1101'
>>> (13).bit_count()
3
to_bytes(length=1, byteorder='big', *, signed=False)

Return an array of bytes representing an integer.

length

Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1.

byteorder

The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.

signed

Determines whether two’s complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.

classmethod from_bytes(bytes, byteorder='big', *, signed=False)

Return the integer represented by the given array of bytes.

bytes

Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol.

byteorder

The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.

signed

Indicates whether two’s complement is used to represent the integer.

as_integer_ratio()

Return integer ratio.

Return a pair of integers, whose ratio is exactly equal to the original int and with a positive denominator.

>>> (10).as_integer_ratio()
(10, 1)
>>> (-10).as_integer_ratio()
(-10, 1)
>>> (0).as_integer_ratio()
(0, 1)
real

the real part of a complex number

imag

the imaginary part of a complex number

numerator

the numerator of a rational number in lowest terms

denominator

the denominator of a rational number in lowest terms

classmethod __contains__(member)

Return True if member is a member of this enum raises TypeError if member is not an enum member

note: in 3.12 TypeError will no longer be raised, and True will also be returned if member is the value of a member in this enum

classmethod __getitem__(name)

Return the member matching name.

__init__(*args, **kwds)
classmethod __len__()

Return the number of members (no aliases)

class Distribution

Bases: IntEnum

Pattern to use to distribute policies across nodes

ROUNDROBIN BLOCK DEFAULT - Defaults to roundrobin

ROUNDROBIN = 1
BLOCK = 2
DEFAULT = 3
conjugate()

Returns self, the complex conjugate of any int.

bit_length()

Number of bits necessary to represent self in binary.

>>> bin(37)
'0b100101'
>>> (37).bit_length()
6
bit_count()

Number of ones in the binary representation of the absolute value of self.

Also known as the population count.

>>> bin(13)
'0b1101'
>>> (13).bit_count()
3
to_bytes(length=1, byteorder='big', *, signed=False)

Return an array of bytes representing an integer.

length

Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1.

byteorder

The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.

signed

Determines whether two’s complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.

classmethod from_bytes(bytes, byteorder='big', *, signed=False)

Return the integer represented by the given array of bytes.

bytes

Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol.

byteorder

The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.

signed

Indicates whether two’s complement is used to represent the integer.

as_integer_ratio()

Return integer ratio.

Return a pair of integers, whose ratio is exactly equal to the original int and with a positive denominator.

>>> (10).as_integer_ratio()
(10, 1)
>>> (-10).as_integer_ratio()
(-10, 1)
>>> (0).as_integer_ratio()
(0, 1)
real

the real part of a complex number

imag

the imaginary part of a complex number

numerator

the numerator of a rational number in lowest terms

denominator

the denominator of a rational number in lowest terms

classmethod __contains__(member)

Return True if member is a member of this enum raises TypeError if member is not an enum member

note: in 3.12 TypeError will no longer be raised, and True will also be returned if member is the value of a member in this enum

classmethod __getitem__(name)

Return the member matching name.

__init__(*args, **kwds)
classmethod __len__()

Return the number of members (no aliases)

class WaitMode

Bases: IntEnum

Channel WaitMode type

IDLE = 1
SPIN = 2
DEFAULT = 3
conjugate()

Returns self, the complex conjugate of any int.

bit_length()

Number of bits necessary to represent self in binary.

>>> bin(37)
'0b100101'
>>> (37).bit_length()
6
bit_count()

Number of ones in the binary representation of the absolute value of self.

Also known as the population count.

>>> bin(13)
'0b1101'
>>> (13).bit_count()
3
to_bytes(length=1, byteorder='big', *, signed=False)

Return an array of bytes representing an integer.

length

Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1.

byteorder

The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.

signed

Determines whether two’s complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.

classmethod from_bytes(bytes, byteorder='big', *, signed=False)

Return the integer represented by the given array of bytes.

bytes

Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol.

byteorder

The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.

signed

Indicates whether two’s complement is used to represent the integer.

as_integer_ratio()

Return integer ratio.

Return a pair of integers, whose ratio is exactly equal to the original int and with a positive denominator.

>>> (10).as_integer_ratio()
(10, 1)
>>> (-10).as_integer_ratio()
(-10, 1)
>>> (0).as_integer_ratio()
(0, 1)
real

the real part of a complex number

imag

the imaginary part of a complex number

numerator

the numerator of a rational number in lowest terms

denominator

the denominator of a rational number in lowest terms

classmethod __contains__(member)

Return True if member is a member of this enum raises TypeError if member is not an enum member

note: in 3.12 TypeError will no longer be raised, and True will also be returned if member is the value of a member in this enum

classmethod __getitem__(name)

Return the member matching name.

__init__(*args, **kwds)
classmethod __len__()

Return the number of members (no aliases)

placement: Placement = -1
host_name: str = ''
host_id: int = -1
distribution: Distribution = 3
cpu_affinity: list [int ]
gpu_env_str: str = ''
gpu_affinity: list [int ]
wait_mode: WaitMode = 3
refcounted: bool = True
property sdesc: Dict
get_sdict() Dict
classmethod from_sdict(sdict)
classmethod thread_policy()
classmethod merge(low_policy, high_policy)

Merge two policies, using values from high_policy for values not assigned on init Returns a new policy

Parameters:
  • low_policy (Policy) – Default values will be replaced by high_policy values

  • high_policy (Policy) – Non-default values take precedence

Returns:

Merged policy object

Return type:

Policy

classmethod global_policy()
__init__(placement: ~dragon.infrastructure.policy.Policy.Placement = Placement.DEFAULT, host_name: str = '', host_id: int = -1, distribution: ~dragon.infrastructure.policy.Policy.Distribution = Distribution.DEFAULT, cpu_affinity: list[int] = <factory>, gpu_env_str: str = '', gpu_affinity: list[int] = <factory>, wait_mode: ~dragon.infrastructure.policy.Policy.WaitMode = WaitMode.DEFAULT, refcounted: bool = True) None