Browse Source

Use condition variables instead of semaphores in fifo.

master
Rui Batista 8 years ago
parent
commit
1bbba9bc3b
1 changed files with 72 additions and 48 deletions
  1. 72
    48
      src/libespeak-ng/fifo.c

+ 72
- 48
src/libespeak-ng/fifo.c View File

#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <pthread.h> #include <pthread.h>
#include <semaphore.h>
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
// my_stop_is_required, and the command fifo // my_stop_is_required, and the command fifo
static pthread_mutex_t my_mutex; static pthread_mutex_t my_mutex;
static int my_command_is_running = 0; static int my_command_is_running = 0;
static pthread_cond_t my_cond_command_is_running;
static int my_stop_is_required = 0; static int my_stop_is_required = 0;


// my_thread: reads commands from the fifo, and runs them. // my_thread: reads commands from the fifo, and runs them.
static pthread_t my_thread; static pthread_t my_thread;
static sem_t my_sem_start_is_required;
static sem_t my_sem_stop_is_acknowledged;

static pthread_cond_t my_cond_start_is_required;
static int my_start_is_required = 0;

static pthread_cond_t my_cond_stop_is_acknowledged;
static int my_stop_is_acknowledged = 0;


static void *say_thread(void *); static void *say_thread(void *);


pthread_mutex_init(&my_mutex, (const pthread_mutexattr_t *)NULL); pthread_mutex_init(&my_mutex, (const pthread_mutexattr_t *)NULL);
init(0); init(0);


assert(-1 != sem_init(&my_sem_start_is_required, 0, 0));
assert(-1 != sem_init(&my_sem_stop_is_acknowledged, 0, 0));
assert(-1 != pthread_cond_init(&my_cond_command_is_running, NULL));
assert(-1 != pthread_cond_init(&my_cond_start_is_required, NULL));
assert(-1 != pthread_cond_init(&my_cond_stop_is_acknowledged, NULL));


pthread_attr_t a_attrib; pthread_attr_t a_attrib;
if (pthread_attr_init(&a_attrib) if (pthread_attr_init(&a_attrib)
pthread_attr_destroy(&a_attrib); pthread_attr_destroy(&a_attrib);


// leave once the thread is actually started // leave once the thread is actually started
while ((sem_wait(&my_sem_stop_is_acknowledged) == -1) && errno == EINTR)
assert(-1 != pthread_mutex_lock(&my_mutex));
while ((my_stop_is_acknowledged == 0) || ((pthread_cond_wait(&my_cond_stop_is_acknowledged, &my_mutex) == -1) && errno == EINTR))
continue; // Restart when interrupted by handler continue; // Restart when interrupted by handler
my_stop_is_acknowledged = 0;
pthread_mutex_unlock(&my_mutex);
} }


espeak_ng_STATUS fifo_add_command(t_espeak_command *the_command) espeak_ng_STATUS fifo_add_command(t_espeak_command *the_command)
pthread_mutex_unlock(&my_mutex); pthread_mutex_unlock(&my_mutex);
return status; return status;
} }

if ((status = pthread_mutex_unlock(&my_mutex)) != ENS_OK)
return status;

if (!my_command_is_running) {
// quit when command is actually started
// (for possible forthcoming 'end of command' checks)
sem_post(&my_sem_start_is_required);
int val = 1;
while (val > 0) {
usleep(50000); // TBD: event?
sem_getvalue(&my_sem_start_is_required, &val);
my_start_is_required = 1;
pthread_cond_signal(&my_cond_start_is_required);

while (!my_command_is_running) {
if((status = pthread_cond_wait(&my_cond_command_is_running, &my_mutex)) != ENS_OK && errno != EINTR) {
pthread_mutex_unlock(&my_mutex);
return status;
} }
} }
if ((status = pthread_mutex_unlock(&my_mutex)) != ENS_OK)
return status;


return ENS_OK; return ENS_OK;
} }
return status; return status;
} }


if ((status = pthread_mutex_unlock(&my_mutex)) != ENS_OK)
return status;

if (!my_command_is_running) {
// quit when one command is actually started
// (for possible forthcoming 'end of command' checks)
sem_post(&my_sem_start_is_required);
int val = 1;
while (val > 0) {
usleep(50000); // TBD: event?
sem_getvalue(&my_sem_start_is_required, &val);
my_start_is_required = 1;
pthread_cond_signal(&my_cond_start_is_required);
while (!my_command_is_running) {
if((status = pthread_cond_wait(&my_cond_command_is_running, &my_mutex)) != ENS_OK && errno != EINTR) {
pthread_mutex_unlock(&my_mutex);
return status;
} }
} }
if ((status = pthread_mutex_unlock(&my_mutex)) != ENS_OK)
return status;


return ENS_OK; return ENS_OK;
} }
my_stop_is_required = 1; my_stop_is_required = 1;
} }


if ((status = pthread_mutex_unlock(&my_mutex)) != ENS_OK)
return status;

if (a_command_is_running) { if (a_command_is_running) {
while ((sem_wait(&my_sem_stop_is_acknowledged) == -1) && errno == EINTR)
while ((my_stop_is_acknowledged == 0) || ((pthread_cond_wait(&my_cond_stop_is_acknowledged, &my_mutex) == -1) && errno == EINTR))
continue; // Restart when interrupted by handler continue; // Restart when interrupted by handler
} }


my_stop_is_required = 0; my_stop_is_required = 0;
if ((status = pthread_mutex_unlock(&my_mutex)) != ENS_OK)
return status;


return ENS_OK; return ENS_OK;
} }
{ {
int a_start_is_required = 0; int a_start_is_required = 0;


// Wait for the start request (my_sem_start_is_required).
// Wait for the start request (my_cond_start_is_required).
// Besides this, if the audio stream is still busy, // Besides this, if the audio stream is still busy,
// check from time to time its end. // check from time to time its end.
// The end of the stream is confirmed by several checks // The end of the stream is confirmed by several checks
clock_gettime2(&ts); clock_gettime2(&ts);


add_time_in_ms(&ts, INACTIVITY_TIMEOUT); add_time_in_ms(&ts, INACTIVITY_TIMEOUT);
err = pthread_mutex_lock(&my_mutex);
assert(err != -1);


while ((err = sem_timedwait(&my_sem_start_is_required, &ts)) == -1
while ((err = pthread_cond_timedwait(&my_cond_start_is_required, &my_mutex, &ts)) == -1
&& errno == EINTR) && errno == EINTR)
continue; continue;


if (err == 0) if (err == 0)
a_start_is_required = 1; a_start_is_required = 1;
} }
pthread_mutex_unlock(&my_mutex);
return a_start_is_required; return a_start_is_required;
} }




if (a_stop_is_required) { if (a_stop_is_required) {
// acknowledge the stop request // acknowledge the stop request
a_status = sem_post(&my_sem_stop_is_acknowledged);
if((a_status = pthread_mutex_lock(&my_mutex)) != ENS_OK)
return a_status;

my_stop_is_acknowledged = 1;
a_status = pthread_cond_signal(&my_cond_stop_is_acknowledged);
if(a_status != ENS_OK)
return a_status;
a_status = pthread_mutex_unlock(&my_mutex);
if (status == ENS_OK) if (status == ENS_OK)
status = a_status; status = a_status;
} }
} }


(void)p; // unused (void)p; // unused


// announce that thread is started // announce that thread is started
sem_post(&my_sem_stop_is_acknowledged);
assert(-1 != pthread_mutex_lock(&my_mutex));
my_stop_is_acknowledged = 1;
assert(-1 != pthread_cond_signal(&my_cond_stop_is_acknowledged));
assert(-1 != pthread_mutex_unlock(&my_mutex));


int look_for_inactivity = 0; int look_for_inactivity = 0;


} }
look_for_inactivity = 1; look_for_inactivity = 1;


int a_status = pthread_mutex_lock(&my_mutex);
assert(!a_status);

if (!a_start_is_required) { if (!a_start_is_required) {
while ((sem_wait(&my_sem_start_is_required) == -1) && errno == EINTR)
while ((my_start_is_required == 0) || ((pthread_cond_wait(&my_cond_start_is_required, &my_mutex) == -1) && errno == EINTR))
continue; // Restart when interrupted by handler continue; // Restart when interrupted by handler
} }



my_command_is_running = 1; my_command_is_running = 1;


assert(-1 != pthread_cond_broadcast(&my_cond_command_is_running));
assert(-1 != pthread_mutex_unlock(&my_mutex));

while (my_command_is_running) { while (my_command_is_running) {
int a_status = pthread_mutex_lock(&my_mutex); int a_status = pthread_mutex_lock(&my_mutex);
assert(!a_status); assert(!a_status);
a_status = pthread_mutex_unlock(&my_mutex); a_status = pthread_mutex_unlock(&my_mutex);
my_command_is_running = 0; my_command_is_running = 0;
} else { } else {
// purge start semaphore
while (0 == sem_trywait(&my_sem_start_is_required))
;
my_start_is_required = 0;


if (my_stop_is_required) if (my_stop_is_required)
my_command_is_running = 0; my_command_is_running = 0;


if (my_stop_is_required) { if (my_stop_is_required) {
// no mutex required since the stop command is synchronous // no mutex required since the stop command is synchronous
// and waiting for my_sem_stop_is_acknowledged
// and waiting for my_cond_stop_is_acknowledged
init(1); init(1);


// purge start semaphore
while (0 == sem_trywait(&my_sem_start_is_required))
;
assert(-1 != pthread_mutex_lock(&my_mutex));
my_start_is_required = 0;


// acknowledge the stop request // acknowledge the stop request
int a_status = sem_post(&my_sem_stop_is_acknowledged);
my_stop_is_acknowledged = 1;
int a_status = pthread_cond_signal(&my_cond_stop_is_acknowledged);
assert(a_status != -1); assert(a_status != -1);
pthread_mutex_unlock(&my_mutex);

} }
// and wait for the next start // and wait for the next start
} }
pthread_cancel(my_thread); pthread_cancel(my_thread);
pthread_join(my_thread, NULL); pthread_join(my_thread, NULL);
pthread_mutex_destroy(&my_mutex); pthread_mutex_destroy(&my_mutex);
sem_destroy(&my_sem_start_is_required);
sem_destroy(&my_sem_stop_is_acknowledged);
pthread_cond_destroy(&my_cond_start_is_required);
pthread_cond_destroy(&my_cond_stop_is_acknowledged);


init(0); // purge fifo init(0); // purge fifo
} }

Loading…
Cancel
Save