News
 
Product info
 
Project area

Dynamite - blasting obstacles to parallel cluster computing

Amsterdam - Monday 25th January 1999  Workstations make up a very large fraction of the total available computing capacity in many organisations. In order to use this capacity optimally, dynamic allocation of computing resources is needed. The Esprit project Dynamite addresses this load balancing problem through the migration of tasks in a dynamically linked parallel program. An important goal of the project is to accomplish this in a manner that is transparent both to the application programmer and to the user. As a test bed, the Pam-Crash software from ESI is used.

Introduction

Workstations have become ubiquitous in many organisations. By their nature, they are often used intensively during normal working hours, and are often largely idle otherwise. They represent a huge reservoir of computing capacity that can be used much more efficiently.

Thus, we currently witness a shift of emphasis in high-performance computing from expensive, special-purpose monolithic systems to the use of clusters of workstations or PCs.

When using time-shared workstation clusters as HPC compute servers, however, one has to cope with the dynamical behaviour of the compute nodes, the network load and the application tasks. These can lead to local load imbalances, which hamper the application's execution speed and the overall system performance.

The application itself can also exhibit dynamic behaviour due to changes in the load per task (e.g. contact problems in car crash simulations). This leads to serious load imbalances, which are difficult to resolve, even on dedicated parallel platforms that offer a constant performance per node. When the node performance changes dynamically, as in workstation clusters, the situation becomes even more difficult.

Also, running a HPC task on a workstation may jeopardise its primary purpose of providing computing capacity to a particular employee.

Solving these problems requires that work somehow be migrated from one node to another. This can be done internally to the parallel application, but such an approach requires a major adaptation of each individual program. Various solutions have been developed to improve the load distribution for workstations. These range from systems that schedule parallel or sequential jobs on free workstations, such as LSF [1], via systems that can also migrate sequential jobs, such as Codine [2] and Condor [3, 4], to systems that also aim to migrate tasks in parallel jobs. MPVM/MIST [5, 6] does this for PVM based jobs, Hector [7] for MPI.

In the ESPRIT project 23499 "DYNAMITE", we develop a dynamic execution environment that handles the load balancing of parallel applications in a dynamically changing cluster environment by migrating individual tasks in a manner that is robust, efficient and transparent to the user and the application programmer. The DYNAMITE software is based on PVM 3.3.11 and is called Dynamic PVM [8] or DPVM for short. DPVM is totally transparent to the user's application: existing PVM codes need only be linked to the DPVM library. The DYNAMITE system is intended for environments requiring a relatively infrequent redistribution of workload for large applications that can run for several days. We strive for a response time of at most a few minutes and a minimal overhead, but give an absolute priority to reliability and stability.

In constructing such an environment, the following problems need to be addressed:

  • migration of dynamically linked tasks,
  • migration of communication endpoints,
  • load monitoring,
  • task (re-)allocation,
  • job preparation

In this paper, we describe the ongoing work in the DYNAMITE project. The first two issues will be addressed in the next section. Subsequently we will address load monitoring, task allocation and job preparation in separate sections, before coming to our conclusions.

Migration

Migration of tasks requires that the state of the task is captured, after which a new task is started on the target machine, initialised with the captured state. Correct migration is difficult because the interactions of the task with its environment need to be taken into account. A completely transparent migration, which cannot be detected by the task or its communication partners, is almost impossible to realise, but is not usually necessary either. We strive to migrate dynamically linked tasks with open files, communicating with other tasks solely through PVM.

For the migration of tasks with open files, we impose the additional requirement that these files can be accessed using the same path on both the source and target machine.

We have implemented the migration mechanism making use of a full checkpoint of the task. Though it requires additional communication and I/O compared to a mechanism based on a direct transfer of the task image from source to target machine, we have decided to use this approach for reasons of robustness and clarity of implementation.

Pilot versions of the checkpointer and migrator were implemented for the SUN Solaris operating system, and have been tested on OS versions 2.5.1 and 2.6 on UltraSparc workstations.

Migration of dynamically linked code

As stated, as the first step in the migration of a task, a checkpoint dump is made.

The checkpointing implementation used in DYNAMITE differs from existing implementations in two ways. Firstly, the checkpointing code is not linked into the program itself. Instead, it is present in the dynamic loader, a piece of code loaded before the actual program is run. The task of the dynamic loader is to load the shared libraries required by the program. Most Unix systems implement shared libraries using a dynamic loader, and have an option to specify a different loader for each program. This option is used to specify our own dynamic loader.

In DYNAMITE, this dynamic loader will perform these tasks as usual, but will also contain code to handle checkpointing signals, and to keep information on the used shared libraries. This means that it can take care of creating the checkpoint, and restoring it, using the exact same memory mappings for shared libraries. This is important, because shared libraries are normally not guaranteed to be mapped on the same memory address, which would make restarting the application impossible.

The other new aspect in the checkpointing code is the propagation of checkpointing signals. This means that the dynamic loader will, before creating the checkpoint, signal the application to allow it to save state that can possibly not be saved within the framework of the normal checkpointing procedure.

The current implementation of the checkpointer has the following limitations:

  • The checkpointed task should not be multithreaded. This limitation applies to all PVM programs anyway.
  • The checkpointed task should not have any files or network connections open, save for those serviced by PVM. Migration of open files will be supported in a later version.
  • The checkpointer writes a full checkpoint to a file, including any mapped dynamic libraries and the complete data segment. An earlier version of DPVM used a migration approach in which most of the data segment was transferred directly from the old to the new task image through a socket. While this approach has a speed advantage, it hampers a robust implementation.

It is not necessary for the original task still to be active for the restart, as would be the case when part of the image is transferred directly from the old to the new task. The checkpoint file is an executable in its own right, and can thus be restarted in the usual way.

The job of the migrator, which is part of the DPVM library, is to start a new task on the target machine, using the checkpointed executable.

Migration of communicating tasks

A main objective of the DPVM migration facility is transparency of the migration protocol. With respect to the task selected for migration this implies transparent suspension and resumption of execution: the task has no notion that it is migrated to another host, and the communication can be delayed without failure triggered by migration of one of the tasks. The work upon which our implementation is based is described in [8]

The first step in the migration protocol is the creation of a new process context at the destination host by sending a message to the PVM daemon (pvmd) representing that host. Next, the master pvmd updates its routing table to reflect the new location of the task. Before the task selected for migration is suspended, the communication between this task and its pvmd has to be flushed. Then the task is disconnected from its local pvmd and messages arriving for that task are refused by the task's original pvmd. The master pvmd will now broadcast the new location to all other pvmds, so that any subsequent message is directed to the task's new location.

The next phase is the actual migration of the process. The original task is checkpointed and the newly created process on the destination host is requested to restart the checkpoint.

Finally, after the checkpoint is read, the original state of the task (among which data, stack, signal mask, and registers) is restored and the task is restarted. Any message that arrived during the checkpoint/migration phase is then delivered to the restarted task.

Packet Routing

In PVM the task identifier, task id for short, is a unique identifier that serves as the task's address and therefore may be distributed to other PVM tasks for communication purposes. For this reason, the task id must remain unchanged during the lifetime of a task, even when the task is migrated.

This has implications for the packet routing of messages. The task id contains the host identifier at which the task is enrolled and a task sequence number. This information is used by the pvmd to route packets to their destination, i.e., to the appropriate pvmd and task. When a task is migrated to another host, this routing information is not correct anymore. Therefore, an additional routing functionality must be incorporated in the pvmd routing software in order to support the migration of tasks. An important design constraint is that the routing facility must be highly efficient and should not impose additional limitations on the scalability.

To provide transparent and correct message routing with migrating tasks, the task ids must be made location independent, virtualising the task ids. This is accomplished by maintaining additional routing information tables in all pvmds. These routing tables are consulted for all inter-task communication. Upon migration of a task, first the routing table of the master pvmd is updated to reflect the change in location of the migrated task. Next, the master pvmd broadcasts the routing table change to all other pvmds, so that each routing table reflects the actual location of all migrated tasks in the system.

Direct Connections

The basic mode of communication in PVM is through the daemon. For reasons of efficiency, PVM allows tasks to request a direct connection to another task. This complicates the rerouting of the communication. The main problem of direct communication connections is making sure that all communication has been flushed. Simply breaking the connections may result in loss of messages and is not acceptable. Several approaches are possible. Some involve shutting down communication for the whole system temporarily, but this may cause unnecessary delay. Another approach is to leave an agent in place that takes care of the connection as long as it has not been confirmed as flushed by the other side.

A related problem occurs in the implementation of task migration in MPI. MPI is a specification that is often implemented using direct connections, as in MPICH, a popular MPI implementation. See [7] for one possible solution for the problems that occur when implementing task migration for this MPI version.

ResourceMonitoring

Any migration decision has to be based on the information that is currently available about the cluster. This refers to the state of the hardware as well as to the runtime behaviour of the applications. The typical approach taken by most cluster management systems is to measure the load on each available host and of each application process. The busiest tasks are then moved to the least loaded nodes until a satisfactory state is achieved. This strategy has been proven well suited for running independent jobs on networks of workstations, but it performs less well for parallel applications as it completely neglects communication between interdependent tasks. This drawback is especially apparent in environments with significant performance differences between the nodes. In such scenarios, it is often the case that larger machines (typically SMPs or NUMAs with 4 to 16 processors) are assigned multiple processes. It is then desirable to have frequently communicating tasks grouped together on big machines (Figure 1).

Fig. 1. :GroupedTasks

In the first case, the sequential load is equally balanced but the communication is not. Therefore, the monitoring tool must also keep track of the communication between the tasks. In order to make an optimal migration decision, the following information is needed:

  • available capacity on each node (CPU, memory, disk space),

  • current load of each node,
  • required capacity for each task,
  • network connectivity and capacity,
  • communication pattern for each task.

Each of these items can be measured at execution time by monitoring software, but we assume that node capacity and network properties are sufficiently stable that they can best be specified beforehand by the system administrator. Therefore, we have chosen a textual representation of the static resources (see [9] for further details).

Detailed information about the network topology can be obtained from a "Network Resource Description" file that is used for migration decisions. Tasks should preferably be migrated to nodes in the same subnet. This provides locality for the messages and prevents that a large amount of data has to be routed from one subnet to the other. If it is not possible to fulfil the requirement for locality then nodes in adjacent subnets are selected.

Fig. 2. : Capacity and Message Monitoring

Because of the assumed dynamic behaviour of the application and the system load, the other items need to be obtained by monitoring software. Information about load and capacity must be collected from all nodes of the cluster, also those where currently no task of the parallel application is running. This is accomplished by running a small monitor program (monitor slave) on each node(Figure 2).

The statistics obtained by the monitor slaves are sent to the monitor master process that is not only responsible for maintaining the whole cluster statistics but also has to make migration decisions. The information on communication patterns is obtained directly from the DPVM environment. Therefore, DPVM has been enhanced by a message monitoring thread. This thread keeps track of each message sent and received. These communication statistics are also sent to the monitor master process that is depicted in detail in Figure 3.

Fig. 3. : Architecture of the Monitor Master Process

The monitor master process consists of five threads that operate concurrently. The message dispatcher thread identifies each message received and appends it to the appropriate queue. There exist three different queues:

  • a node capacity queue to store the information from the monitor slaves (CPU, memory, I/O, ...),
  • a DPVM capacity queue to store the information about CPU and memory utilisation of the DPVM processes and
  • a communication queue to store the information about the communication activity between the DPVM processes.

The queues act as an intermediate store because the statistics threads are only active every j seconds, where j can be adapted to the application monitored. Long running applications don't need a short monitoring interval and therefore the statistics need not be updated regularly. Each statistic thread maintains a ring-buffer (not shown) where the last l entries are stored. Each entry in the ring-buffer corresponds to a snapshot of the monitored data at a certain point in time. It is obvious that it is not practicable to store all values since monitoring has begun. Therefore, we have chosen to implement a moving average scheme that keeps track of the last l entries.

This scheme has the advantage that we can apply a recursive formula that depends only on the newest and oldest value of the ring-buffer. This speeds-up calculation of the moving averages and decreases the monitoring overhead. To allow further processing, e.g. to visualise the data sets, the statistical data is also written to disk (not shown).

Migration Decider

The migration decider is the main part of the scheduler thread that is executed periodically by the monitor master process. Based on the monitored data, the migration decider has to judge about where and when to migrate a task from an overloaded node. Additionally the task to be moved causes some constraints on the migration decision. Therefore, the master load monitor has to supply some normalised values about the attributes CPU, memory, and disk swap space of each node and additionally the available network capacity.

The increasing interest in distributed computing has lead to intensive scientific research in load balancing schemes for distributed memory systems [2, 5, 10, 11, 12, 13, 14, 15]. Because not every load-balancing scheme is applicable to every application, the migration decider has been designed in a flexible manner to support a broad range of applications. For the first prototype we have implemented a straightforward solution with a greedy-like algorithm and constraints lists.

We call ci,j the available capacity of the attributei of the nodej. In conjunction with priority coefficients ki for each attribute we are able to calculate the local available capacity Cj of the nodej which is given by(1)

(1)

Using the priority coefficients ki we can adapt the load-balancing scheme to the needs of different applications. Applications with a high demand in CPU and memory capacity like Pam-Crash will use a high value for these priority coefficients. All Cj will then be sorted. Sorting Cj in ascending order provides us a data-set which comprises the capacity roomC. Sorting Cj in descending order provides the data-set for the load roomL. Each of these data sets are managed as priority queues (heaps) as indicated by Figure 4.

Fig. 4. : Architecture of the Migration Decider

The migration decider only looks at the first element of each heap. The first element of the capacity roomC represents the node with the highest available capacity. Whereas the first element of the load roomL represents the most heavily burdened node. A migration will be triggered, if the following conditions are met:

L1>T and C1>1-T with i,j in{1,...,n} and n=number of nodes, whereT denotes an application specific threshold level for the task migration. By using heaps for the data management, the migration decider task is able to retrieve the essential information with minimum effortO(1). Additionally, updating elements in the data room can be done with O(n*logn). Although there exist other schemes with faster access to the data elements (e.g.linked lists) if only a few number of tasks have to be considered but by using heaps we are not limited to support only a small number of tasks.

As illustrated in Figure 5, the algorithm of the decider is straightforward. The function CheckForMigration will be called periodically to check if the load index of the most loaded node is higher than a user defined threshold level and furthermore if a node exists which has enough remaining capacity (migration mapping). When the decision for migration is taken, the tasks are moved from the 'overloaded' node to the node with the best capacity left. Thereafter both data rooms are reordered by setting the load and capacity indices of the corresponding nodes to default values and by re-sorting the data heaps. By using a recursive algorithm, the whole migration is done in one global step. As a result, the application uses the whole workstation cluster efficiently and expensive compute time is not wasted migrating single tasks one at a time.

CheckForMigration () {
/* Will be triggered at least every t seconds */
if (GetMaxLoadFromListOfLoadedNodes() <= Threshold) return;
if (GetBestCapacity() > (1.0 - Threshold)) {
/* there exists a node which is less burdened;
> do the migration stuff */
DoMigrationStuff();
UpdateLoadRoom(); /* effort: O(n * log n) */
UpdateCapacityRoom(); /* effort: O(n * log n) */
CheckForMigration(); /* do the recursion */
}
}

Fig. 5. : Pseudo-Code of the Recursive Algorithm for the Decider Module

Job preparation

As is the case for every parallel application, an application using the DYNAMITE environment must be split into separate tasks. These tasks must be started on the nodes of the assigned cluster. Usually, in FEM applications, such as Pam-Crash [16], and many others, this is accomplished by partitioning the problem data over the available nodes in proportion to the capacity of a node. This will result in a tight fit, which is fine if there are no variations in load or capacity. For DYNAMITE we are considering two other approaches:

1.

.Sparse decomposition. When the aim is to allow any one workstation from a pool of (equal) workstations to be temporarily used for other purposes, the task should be split into fewer subtasks than the number of available nodes. In this way, flexibility is gained at a cost in performance.

2.

.Redundant decomposition. When the aim is to allow for the redistribution of work in an application that produces a dynamically changing load, it may be preferable to split the data so that every workstation gets more than one partition. In this way load can easily be shifted, albeit at a cost in communication efficiency.

Beside this additional choice in the partitioning, running a task under DYNAMITE also requires the monitoring tasks to be started together with the DPVM system. Though this need not require any additional effort on the side of the user, we will provide a simple GUI to assist the user in starting his DYNAMITE empowered application.

Conclusions

DYNAMITE will provide the application developer with a robust tool that makes it possible to respond flexibly to dynamic changes in the available system capacity and application workload. The DYNAMITE system will migrate (dynamically linked) tasks from a parallel program when necessary. The overhead involved will be very small compared to the possible cost of a load imbalance. The system structure is modular so that it can easily be adapted to specific application requirements. In the development phase this modularity will be used for experimentation with various migration policies.


G.D. van Albada, Department of Computer Science, Universiteit van Amsterdam, Kruislaan 403, 1098 SJ Amsterdam, The Netherlands;
J. Clinckemaillie, Engineering Systems International, 20 Rue Saarinen, F-94578 Rungis SILIC 270, France;
A.H.L. Emmen, Genias Benelux BV, James Stewartstraat 248, 1325 JN Almere, The Netherlands;
J. Gehring, Paderborn Center for Parallel Computing, Fuerstenallee 11, 33102 Paderborn, Germany; O. Heinz, Paderborn Center for Parallel Computing, Fuerstenallee 11, 33102 Paderborn, Germany; F. van der Linden, Department of Computer Science, Universiteit van Amsterdam, Kruislaan 403, 1098 SJ Amsterdam, The Netherlands B.J. Overeinder, Department of Computer Science, Universiteit van Amsterdam, Kruislaan 403, 1098 SJ Amsterdam, The Netherlands A. Reinefeld, Konrad-Zuse-Zentrum fuer Informationstechnik, Takustrasse 7, D-14195 Berlin, Germany; P.M.A. Sloot, Department of Computer Science, Universiteit van Amsterdam, Kruislaan 403, 1098 SJ Amsterdam, The Netherlands