Многопотоковое программирование

Многопотоковое программирование предложено в качестве средства разработки параллельных программ для многопроцессорных систем (систем с разделяемой памятью). При этом реальное разнесение потоков управления на разные процессоры - задача ОС. Фирма SUN Microsystems для поддержки потоков (нитей) управления реализовала легковесные процессы LWP (LightWeight Processes). Диспетчирование LWP - практически не управляемая пользователем процедура. Потоки характеризуются следующими атрибутами:

Предложены 2 API потокового программирования:

Здесь рассматривается вариант POSIX. Все функции этого варианта имеют в своих именах префикс pthread_ и объявлены в заголовочном файле pthread.h.

Создание потока управления


int pthread_create (pthread_t *tid_p, const pthread_attr_t *attr_p,
		void *(*func_p)(void *), void *arg_p)

Создает новый поток для функции, заданной параметром func_p. Эта функция имеет аргументом указатель (void *) и возвращает значение того же типа. Реально же в функцию передается аргумент arg_p. Идентификатор нового потока возвращается через tid_p.

Аргумент attr_p указывает на структуру, задающую атрибуты вновь создаваемого потока. Если attr_p=NULL, то используются атрибуты "по умолчанию" (но это плохая практика, т.к. в разных ОС эти значения могут быть различными, хотя декларируется обратное). Одна структура, указываемая attr_p, может использоваться для управления несколькими потоками.

Инициализация атрибутов потока


int pthread_attr_init (pthread_attr_t *attr_p)

Инициализирует структуру, указываемую attr_p, значениями "по умолчанию" (при этом распределяется кое-какая память).

Не будем обсуждать все атрибуты и подробности их использования, дадим лишь список и поясним два из них.

  1. Область действия конкуренции (scope) [PTHREAD_SCOPE_PROCESS] - определяет связность потока с LWP.
  2. Отсоединенность (detachstate) [PTHREAD_CREATE_JOINABLE] - определяет то, может или нет какой-либо другой поток ожидать окончания данного (посредством функции).
  3. Адрес динамического стека потока (stackaddr) [NULL].
  4. Размер динамического стека потока(stacksize) [1 Mb].
  5. Приоритет потока (priority) [наследуется от потока-родителя].
  6. Правила и параметры планирования. Неприятно то, что schedpolicy по умолчанию устанавливается в SCHED_OTHER, зависимую от ОС.

Освобождение памяти атрибутов потока


int pthread_attr_destroy (pthread_attr_t *attr_p)

Область конкуренции


int pthread_attr_setscope (pthread_attr_t *attr_p, int scope)
int pthread_attr_getscope (pthread_attr_t *attr_p, int *scope)

scope может принимать два значения:
PTHREAD_SCOPE_PROCESS - для несвязанного потока;
PTHREAD_SCOPE_SYSTEM - для связанного потока.

Состояние отсоединенности


int pthread_attr_setdetachstate (pthread_attr_t *attr_p, int detachstate)
int pthread_attr_getdetachstate (pthread_attr_t *attr_p, int *detachstate)

detachstate может принимать два значения:
PTHREAD_CREATE_DETACHED - для отсоединеного потока;
PTHREAD_CREATE_JOINABLE - для присоединенного потока.

Для отсоединенного потока невозможно его ожидание его окончания другим потоком, поэтому после окончания такого потока все его ресурсы могут быть освобождены (и использованы заново).

Завершение потока

В потоках можно использовать стандартную функцию exit(), однако это ведет к немедленному завершению всех потоков и процесса в целом.

Поток завершается вместе с вызовом return() в функции, вызванной pthread_create().

Поток заканчивает свое выполнение также с помощью функции


pthread_exit (void *status),
допустимо в качестве status использовать NULL.

Поток может быть завершен другим потоком посредством функции pthread_cancel() (с этой функцией работают pthread_setcanceltype, pthread_setcancelstate и pthread_testcancel).

Ожидание завершения потока


int pthread_join (pthread_t tid, void **status)

Вызывающий поток блокируется до окончания потока с идентификатором tid. Поток с идентификатором tid не может быть отсоединенным

Получение идентификатора потока


pthread_t pthread_self (void)

Передача управления другому потоку


int sched_yield (void)

Передает управление другому потоку, имеющему приоритет равный или больший приоритета вызывающего потока.

Посылка сигнала потоку


int pthread_kill (pthread_t tid, int signum)

Посылает сигнал с идентификатором signum в поток, задаваемый идентификатором tid.

Манипулирование сигнальной маской потока


int pthread_sigmask (int mode, sigset_t *set_p, sigset_t *old_p)

Изменяет сигнальную маску потока в соответствии с аргументом mode, который может принимать следующие значения:

Если значение аргумента old_p не равно NULL, то в область памяти, указываемую old_p, помещается предыдущее содержимое сигнальной маски.

Объекты синхронизации потоков управления

Потоки используют единое адресное пространство. Это означает, что все статические переменные доступны потокам в любой момент. Поэтому необходимы средства управления доступом к совместно используемым данным. Здесь возможно использование стандартных средств синхронизации различных процессов: каналы, очереди сообщений, межпроцессные семафоры. Однако, специально для межпотокового взаимодействия предложены индивидуальные средства:

Указанные средства перечислены в порядке ухудшения их эффективности.

Заметим, что доступ к атомарным данным (char, int, double) реализуется за один такт процессора, поэтому существуют ситуации (зависящие от логики программы), когда такие данные сами могут выступать в качестве средства синхронизации.

Взамоисключающие блокировки


int pthread_mutex_init (pthread_mutex_t *mp, const pthread_mutex_attr_t *mattrp)
инициализирует взаимоисключающую блокировку, выделяя необходимую память. Если mattrp=NULL, то создается блокировка с атрибутами "по умолчанию". В настоящее время атрибут один - область действия блокировки, его умолчательное значение - PTHREAD_PROCESS_PRIVATE (а может быть еще PTHREAD_PROCESS_SHARED).


int pthread_mutex_destroy (pthread_mutex_t *mp)
разрушает блокировку, освобождая выделенную память.


int pthread_mutex_lock (pthread_mutex_t *mp)
int pthread_mutex_unlock (pthread_mutex_t *mp)
int pthread_mutex_trylock (pthread_mutex_t *mp)

С помощью pthread_mutex_lock() поток пытается захватить блокировку. Если же блокировка уже принадлежит другому потоку, то вызывающий поток ставится в очередь (с учетом приоритетов потоков) к блокировке. После возврата из функции pthread_mutex_lock() блокировка будет принадлежать вызывающему потоку.

Функция pthread_mutex_unlock() освобождает захваченную ранее блокировку. Освободить блокировку может только ее владелец.

Функция pthread_mutex_trylock() - неблокирующая версия функции pthread_mutex_lock(). Если на момент обращения к этой функции блокировка уже захвачена, то происходит немедленный возврат из функции со значением EBUSY.

Условные переменные

Применяются в сочетании со взаимоисключающими блокировками. Общая схема использования такова. Один поток устанавливает взаимоисключающую блокировку и затем блокирует себя по условной переменной (путем вызова функции pthread_cond_wait()), при этом автоматически (но временно) освобождается взаимоисключающая блокировка. Когда какой-либо другой поток посредством вызова функции pthread_cond_signal() сигнализирует по условной переменной, то первый поток разблокируется и ему возвращается во владение взаимоисключающая блокировка.


int pthread_cond_init (pthread_cond_t *cvp, const pthread_condattr_t *cattrp)
инициализирует условную переменную, выделяя память.


int pthread_cond_destroy (pthread_cond_t *cvp)
разрушает условную переменную, освобождая память.


int pthread_cond_wait (pthread_cond_t *cvp, const pthread_mutex_t *mp)
автоматически освобождает взаимоисключающую блокировку, указанную mp, а вызывающий поток блокируется по условной переменной, заданной cvp. Заблокированный поток разблокируется функциями pthread_cond_signal() и pthread_cond_broadcast(). Одной условной переменной могут быть заблокированы несколько потоков.


int pthread_cond_timedwait (pthread_cond_t *cvp, const pthread_mutex_t *mp, struct timespec *tp)
аналогична функции pthread_cond_wait(), но имеет третий аргумент, задающий интервал времени, после которого поток разблокируется (если этого не было сделано ранее).


int pthread_cond_signal (pthread_cond_t *cvp)
разблокирует ожидающий данную условную переменную поток. Если сигнала по условной переменной ожидают несколько потоков, то будет разблокирован только какой-либо один из них.


int pthread_cond_broadcast (pthread_cond_t *cvp)
разблокирует все потоки, ожидающие данную условную переменную.

Семафоры

Семафор представляет собой целочисленную переменную. Потоки могут наращивать (post) и уменьшать (wait) ее значение на единицу. Если поток пытается уменьшить семафор так, что его значение становится отрицательным, то поток блокируется. Поток будет разблокирован, когда какой-либо другой поток не увеличит значение семафора так, что он станет неотрицательным после уменьшения его первым (заблокированным) потоком.

Потоки похожи на взаимоисключающие блокировки и условные переменные, но отличаются от них тем, что у них нет "владельца", т.е. изменить значение семафора может любой поток.

В POSIX-версии средств многопотокового программирования используются те же самые семафоры, что и для межпроцессного взаимодействия.


#include <semaphore.h>
int sem_init (sem_t *sp, int pshared, unsigned int value)
инициализирует семафор, указанный аргументом sp, значением value. Если pshared=0, то область действия семафора - только один процесс, иначе - несколько процессов.


int sem_destroy (sem_t *sp)
разрушает семафор.


int sem_post (sem_t *sp)
увеличивает значение семафора на 1, при этом может быть разблокирован один (из, возможно, нескольких) поток (какой именно не определено).


int sem_wait (sem_t *sp)
пытается уменьшить значение семафора на 1. Если при этом значение семафора должно стать отрицательным, то поток блокируется.


int sem_trywait (sem_t *sp)
неблокирующая версия функции sem_wait().

Барьеры

Барьер используется для синхронизации работы нескольких потоков управления. Барьер характеризуется натуральным числом count, задающим количество синхронизируемых потоков. Поток управления, "подошедший" к барьеру (обратившийся к функции pthread_barrier), блокируется до момента накопления перед этим барьером указанного количества потоков count.


int pthread_barrier_init(pthread_barrier_t *bp, pthread_barrierattr_t *attr, unsigned count)
инициализирует барьер, выделяя необходимую память, устанавливая значения его атрибутов и назначая count "шириной" барьера. В настоящее время атрибуты барьеров не определены поэтому в качестве второго параметра функции pthread_barrier_init следует использовать NULL.


int pthread_barrier_destroy(pthread_barrier_t *bp)
разрушает барьер, освобождая выделенную память.


int pthread_barrier_wait(pthread_barrier_t *bp)
приостанавливает вызвавший данную функцию поток до момента накопления перед барьером count потоков. Заблокированный поток может быть прерван сигналом, при этом обработчик сигнала (если он был назначен) будет вызван на выполнение обычным образом. Выход из обработчика вернет поток в состояние ожидания, если к этому моменту требуемое количество count потоков еще не скопилось перед барьером.

Пример многопотоковой программы

Пусть наша программа состоит из трех четко функционально выделенных модулей:

Поскольку генератор данных производит их порциями, то имеет смысл сделать программу параллельной для быстрейшего выполнения на многопроцессорной системе.

Отметим 2 важные характеристики нашей программы, существенно облегчающих ее разработку:

Подчеркнем, однако, что во многих реальных задачах ситуация обратная (см. задания на лабораторные работы):

Конкретизируем наш пример:

Для обмена данными в паре producer-invertor используется буфер msgbuf1[], а в паре invertor-consumer - буфер msgbuf2[]. Количество байт данных в буферах определяется переменными msglen1 и msglen2.

Наша параллельная программа будет реализована в рамках четырех потоков:

Примечание. При рачительном программировании функцию потока producer (invertor или consumer) имеет смысл возложить на поток main, тем самым уменьшив количество потоков на 1.

Понятно, что основную сложность в реализации данного примера составляет синхронизация: поток-приемник должен уловить момент, в который поток-источник подготовил для него целостные данные.

Проще всего (но хуже всего даже в нашем случае) использовать для синхронизации значения атомарных целых msglen1 и msglen2. Идея проста:

  1. поток-поставщик готовит данные в буфере обмена, сохраняя в переменной длины msglen* нулевое значение;
  2. поток-приемник в бесконечном цикле опрашивает переменную длины msglen*, дожидаясь ее ненулевого значения;
  3. поток-поставщик, загрузив целостные данные в буфер обмена, за один такт процессора устанавливает перменную msglen* в актуальное значение;
  4. поток-приемник, "снюхав" ненулевое значение в msglen*, начинает освобождение (копированием или обработкой на месте) обменного буфера, после наступления момента "ненужности" входного буфера поток-приемник сбрасывает переменную длины msglen* в ноль;
  5. поток-источник, восприняв переключение msglen* в ноль, начинает готовить новую порцию данных в обменном буфере.

Данный механизм будет, конечно, работать, но очень плохо. Минус - в неопределенных циклах ожидания переключений значений переменных msglen*, которые так же загружают процессоры, как и полезные действия. Внимание: необходимо также глубоко задуматься о "благе", которое дает использование кэша в каждом процессоре.

Представленная ниже версия программы свободна от этого недостатка. Использование взаимоисключающих блокировок (mutx1 и mutx2) и условных переменных (condx1 и condx2) полностью исключает накладные расходы (в нашей программе, но не в системе), необходимые для синхронизации работы пар "поставщик-приемник".


/* Compilation: gcc -o prg prg.c -lpthread -lrt */

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#define _REENTRANT
#include <pthread.h>
#include <sched.h>	// For sched_yield only

#define BUF_SIZE 256

pthread_mutex_t	mutx1, mutx2;
pthread_cond_t	condx1, condx2;
int	msglen1=0, msglen2=0, done=0;
char	msgbuf1[BUF_SIZE], msgbuf2[BUF_SIZE];

void *
producer (void *arg_p) {
write (1, "P start\n", 8);
pthread_mutex_lock(&mutx1);
  while (1) {
    msglen1 = read (0, msgbuf1, BUF_SIZE);
    if (msglen1 == 0) break;
    pthread_cond_wait(&condx1, &mutx1);
    };	// End while(1)
  done = 1;
  pthread_mutex_unlock(&mutx1);
  pthread_exit(0); 
  }

void *
invertor (void *arg_p) {
  char c, buf[BUF_SIZE];
  int i, l;

write (1, "I start\n", 8);
  pthread_mutex_lock(&mutx2);
  while (1) {
    pthread_mutex_lock(&mutx1);
    if (done) {
      pthread_mutex_unlock(&mutx1);
      pthread_mutex_unlock(&mutx2); 
      pthread_exit(0);
      };
    strncpy (buf, msgbuf1, msglen1);
    l = msglen1;
    msglen1 = 0;
    pthread_mutex_unlock(&mutx1);
    pthread_cond_signal(&condx1);
    write (1, "I\n", 2);
    for (i=0; i<(l/2); i++) {
      c = buf[i]; buf[i] = buf[l-2-i]; buf[l-2-i] = c;
      };
    msglen2 = l;
    strncpy (msgbuf2, buf, msglen2);
    pthread_cond_wait(&condx2, &mutx2);
    };	// End while(1)
  }

void *
consumer (void *arg_p) {
write (1, "C start\n", 8);
  while (1) {
    pthread_mutex_lock(&mutx2);
    if (done) {
      pthread_mutex_unlock(&mutx2); 
      pthread_exit(0);
      };
    write (1, "C\n", 2);
    write (1, msgbuf2, msglen2);
    msglen2 = 0;
    pthread_mutex_unlock(&mutx2);
    pthread_cond_signal(&condx2);
    sched_yield();
    };	// End while(1)
  }

int
main (int argc, char* argv[]) {
  pthread_t tid, ptid, itid, ctid;
  pthread_attr_t pattr;
  int ret;
  
  pthread_attr_init (&pattr);
  pthread_attr_setscope (&pattr, PTHREAD_SCOPE_SYSTEM);
  pthread_attr_setdetachstate (&pattr,PTHREAD_CREATE_JOINABLE);
  
  pthread_mutex_init (&mutx1, NULL);
  pthread_cond_init (&condx1, NULL);
  pthread_mutex_init (&mutx2, NULL);
  pthread_cond_init (&condx2, NULL);

  if ( ret=pthread_create (&ptid, &pattr, producer, NULL) )
    perror("pthread_create");
  sleep(1);
  if ( ret=pthread_create (&itid, &pattr, invertor, NULL) )
    perror("pthread_create");
  sleep(1);
  if ( ret=pthread_create (&ctid, &pattr, consumer, NULL) )
    perror("pthread_create");
  
  pthread_join(ctid, NULL);
  
  pthread_mutex_destroy (&mutx1);
  pthread_cond_destroy (&condx1);
  pthread_mutex_destroy (&mutx2);
  pthread_cond_destroy (&condx2);
  
  pthread_exit(0);
  }

Реализованный в представленной выше программе механизм синхронизации "идеален" в ситуации, когда функциональность потока ограничена только непосредственной обработкой транзитных через него данных и передачей результата потоку-приемнику.

Представим, однако, что какой-либо поток (например, invertor) реализует дополнительные (а может быть для самого потока более важные) действия (например, расчет статистических характеристик проходящих через поток текстов). Это значит, что поток invertor должен оперативно обработать входные данные и передать результат на выход, а статистические расчеты должен выполнять в промежутки ожидания готовности входных данных.

Приведенная ниже программа позволяет (с некоторыми усилиями) организовать "дополнительную" обработку в потоках invertor и consumer в неопределенных циклах while(!msglen*).


/* Compilation: gcc -o prg prg.c -lpthread -lrt */

#include <stdlib .h>
#include <unistd .h>
#include <stdio .h>
#include <string .h>
#define _REENTRANT
#include <pthread .h>
#include <sched .h>

#define BUF_SIZE 256

pthread_mutex_t	mutx1, mutx2;
pthread_cond_t	condx1, condx2;
int	msglen1=0, msglen2=0, done=0;
char	msgbuf1[BUF_SIZE], msgbuf2[BUF_SIZE];

void *
producer (void *arg_p) {
write (1, "P start\n", 8);
pthread_mutex_lock(&mutx1);
  while (1) {
    msglen1 = read (0, msgbuf1, BUF_SIZE);
    if (msglen1 == 0) break;
    pthread_cond_wait(&condx1, &mutx1);
    };	// End while(1)
  done = 1;
  pthread_mutex_unlock(&mutx1);
  pthread_exit(0); 
  }

void *
invertor (void *arg_p) {
  char c, buf[BUF_SIZE];
  int i, l;

write (1, "I start\n", 8);
  pthread_mutex_lock(&mutx2);
  while (1) {
    while ( !msglen1 ) {
      if (done) {
        pthread_mutex_unlock(&mutx2); 
        pthread_exit(0);
        };
/* Здесь полезные действия, выполняемые короткими шагами */
      };
    pthread_mutex_lock(&mutx1);
    strncpy (buf, msgbuf1, msglen1);
    l = msglen1;
    msglen1 = 0;
    pthread_mutex_unlock(&mutx1);
    pthread_cond_signal(&condx1);
    for (i=0; i<(l/2); i++) {
      c = buf[i]; buf[i] = buf[l-2-i]; buf[l-2-i] = c;
      };
    msglen2 = l;
    strncpy (msgbuf2, buf, msglen2);
    pthread_cond_wait(&condx2, &mutx2);
    };	// End while(1)
  }

void *
consumer (void *arg_p) {
write (1, "C start\n", 8);
  while (1) {
    while ( !msglen2 ) {
      if (done) {
        pthread_exit(0);
        };
/* Здесь полезные действия, выполняемые короткими шагами */
      };
    pthread_mutex_lock(&mutx2);
    write (1, msgbuf2, msglen2);
    msglen2 = 0;
    pthread_mutex_unlock(&mutx2);
    pthread_cond_signal(&condx2);
    };	// End while(1)
  }

int
main (int argc, char* argv[]) {
  pthread_t tid, ptid, itid, ctid;
  pthread_attr_t pattr;
  int ret;
  
  pthread_attr_init (&pattr);
  pthread_attr_setscope (&pattr, PTHREAD_SCOPE_SYSTEM);
  pthread_attr_setdetachstate (&pattr,PTHREAD_CREATE_JOINABLE);
  
  pthread_mutex_init (&mutx1, NULL);
  pthread_cond_init (&condx1, NULL);
  pthread_mutex_init (&mutx2, NULL);
  pthread_cond_init (&condx2, NULL);

  if ( ret=pthread_create (&ptid, &pattr, producer, NULL) )
    perror("pthread_create");
  if ( ret=pthread_create (&itid, &pattr, invertor, NULL) )
    perror("pthread_create");
  if ( ret=pthread_create (&ctid, NULL, consumer, NULL) )
    perror("pthread_create");
  
  pthread_join(ctid, NULL);
  
  pthread_mutex_destroy (&mutx1);
  pthread_cond_destroy (&condx1);
  pthread_mutex_destroy (&mutx2);
  pthread_cond_destroy (&condx2);
  
  pthread_exit(0);
  }

Недостаток приведенной программы заключается в том, что "немагистральная" обработка в потоках invertor и consumer должна быть реализована краткосрочными тактами, иначе задержка в обработке транзитных данных может стать недопустимой.

Правильный способ реализации эффективной программы, отвечающей требованиям оперативности обработки входных данных и полноты решения самостоятельных задач в потоках, заключается в использовании асинхронных сигналов. Идея такова:

Упрощенный вариант (скорее, схема) такой программы представлен ниже.


/* Compilation: gcc -o prg prg.c -lpthread -lrt */

#include <stdlib .h>
#include <unistd .h>
#include <stdio .h>
#include <string .h>
#include <signal .h>
#define _REENTRANT
#include <pthread .h>
#include <sched .h>

#define BUF_SIZE 256

extern int errno;

pthread_t tid, ptid, itid, ctid;
pthread_mutex_t	mutx1, mutx2;
pthread_cond_t	condx1, condx2;
int	msglen1=0, msglen2=0, done=0;
char	msgbuf1[BUF_SIZE], msgbuf2[BUF_SIZE];

void
invert (int sig) {
  char c, buf[BUF_SIZE];
  int i,l;

  write (1, "Sig cautched\n", 13);

  if (done) {
    pthread_mutex_unlock(&mutx2); 
    pthread_kill(ctid, SIGUSR1);
    pthread_exit(0);
    };

  pthread_mutex_lock(&mutx1);
  strncpy (buf, msgbuf1, msglen1);
  l = msglen1;
  msglen1 = 0;
  pthread_mutex_unlock(&mutx1);
  pthread_cond_signal(&condx1);
  for (i=0; i<(l/2); i++) {
    c = buf[i]; buf[i] = buf[l-2-i]; buf[l-2-i] = c;
    };
  write (1, buf, l);
  strncpy (msgbuf2, buf, l);
  msglen2 = l;
  if ( i = pthread_kill(ctid, SIGUSR1) ) {
    fprintf(stderr, "Ret for pthread_kill = %d\n", i);
    };
  pthread_cond_wait(&condx2, &mutx2);
  }

void *
producer (void *arg_p) {
write (1, "P start\n", 8);
pthread_mutex_lock(&mutx1);
  while (1) {
    msglen1 = read (0, msgbuf1, BUF_SIZE);
    if (msglen1 == 0) break;
    pthread_kill(itid, SIGUSR1);
    pthread_cond_wait(&condx1, &mutx1);
    };  // End while(1)
  done = 1;
  pthread_mutex_unlock(&mutx1);
  pthread_kill(itid, SIGUSR1);
  pthread_exit(0); 
  }

void *
invertor (void *arg_p) {
  struct sigaction act;

write (1, "I start\n", 8);

  act.sa_handler = invert;
  sigaction(SIGUSR1, &act, NULL);
  pthread_mutex_lock(&mutx2);
  while (1) { // Этот цикл имитирует собственную функциональность
    sleep(1);
    write (1, "I working\n", 10);
    };  // End while(1)
  }

void *
consumer (void *arg_p) {
  sigset_t ss;
  int i;

write (1, "C start\n", 8);
  sigemptyset(&ss);
  sigaddset(&ss, SIGUSR1);
//  pthread_sigmask(SIG_UNBLOCK, &ss, NULL);

  while (1) {
    while ( !msglen2 ) {
      if (done) {
        pthread_exit(0);
        };
      sigwait(&ss, &i);
      };
    pthread_mutex_lock(&mutx2);
    write (1, msgbuf2, msglen2);
    msglen2 = 0;
    pthread_mutex_unlock(&mutx2);
    pthread_cond_signal(&condx2);
    };  // End while(1)
  }


int
main (int argc, char* argv[]) {
  pthread_attr_t pattr;
  int ret;
  
  pthread_attr_init (&pattr);
  pthread_attr_setscope (&pattr, PTHREAD_SCOPE_SYSTEM);
  pthread_attr_setdetachstate (&pattr,PTHREAD_CREATE_JOINABLE);
  
  pthread_mutex_init (&mutx1, NULL);
  pthread_cond_init (&condx1, NULL);
  pthread_mutex_init (&mutx2, NULL);
  pthread_cond_init (&condx2, NULL);

  if ( ret=pthread_create (&ptid, &pattr, producer, NULL) )
    perror("pthread_create");
fprintf (stderr, "ptid = %d\n", ptid);
  if ( ret=pthread_create (&itid, &pattr, invertor, NULL) )
    perror("pthread_create");
fprintf (stderr, "itid = %d\n", itid);
  if ( ret=pthread_create (&ctid, NULL, consumer, NULL) )
    perror("pthread_create");
fprintf (stderr, "ctid = %d\n", ctid);
  
  pthread_join(ctid, NULL);
  
  pthread_mutex_destroy (&mutx1);
  pthread_cond_destroy (&condx1);
  pthread_mutex_destroy (&mutx2);
  pthread_cond_destroy (&condx2);
  
  pthread_exit(0);
  }