Logo Search packages:      
Sourcecode: fuse version File versions  Download package

fuse_loop_mt.c

/*
  FUSE: Filesystem in Userspace
  Copyright (C) 2001-2007  Miklos Szeredi <miklos@szeredi.hu>

  This program can be distributed under the terms of the GNU LGPLv2.
  See the file COPYING.LIB.
*/

#include "fuse_lowlevel.h"
#include "fuse_misc.h"
#include "fuse_kernel.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <semaphore.h>
#include <errno.h>
#include <sys/time.h>

struct fuse_worker {
      struct fuse_worker *prev;
      struct fuse_worker *next;
      pthread_t thread_id;
      size_t bufsize;
      char *buf;
      struct fuse_mt *mt;
};

struct fuse_mt {
      pthread_mutex_t lock;
      int numworker;
      int numavail;
      struct fuse_session *se;
      struct fuse_chan *prevch;
      struct fuse_worker main;
      sem_t finish;
      int exit;
      int error;
};

static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
{
      struct fuse_worker *prev = next->prev;
      w->next = next;
      w->prev = prev;
      prev->next = w;
      next->prev = w;
}

static void list_del_worker(struct fuse_worker *w)
{
      struct fuse_worker *prev = w->prev;
      struct fuse_worker *next = w->next;
      prev->next = next;
      next->prev = prev;
}

static int fuse_start_thread(struct fuse_mt *mt);

static void *fuse_do_work(void *data)
{
      struct fuse_worker *w = (struct fuse_worker *) data;
      struct fuse_mt *mt = w->mt;

      while (!fuse_session_exited(mt->se)) {
            int isforget = 0;
            struct fuse_chan *ch = mt->prevch;
            int res = fuse_chan_recv(&ch, w->buf, w->bufsize);
            if (res == -EINTR)
                  continue;
            if (res <= 0) {
                  if (res < 0) {
                        fuse_session_exit(mt->se);
                        mt->error = -1;
                  }
                  break;
            }

            pthread_mutex_lock(&mt->lock);
            if (mt->exit) {
                  pthread_mutex_unlock(&mt->lock);
                  return NULL;
            }

            /*
             * This disgusting hack is needed so that zillions of threads
             * are not created on a burst of FORGET messages
             */
            if (((struct fuse_in_header *) w->buf)->opcode == FUSE_FORGET)
                  isforget = 1;

            if (!isforget)
                  mt->numavail--;
            if (mt->numavail == 0)
                  fuse_start_thread(mt);
            pthread_mutex_unlock(&mt->lock);

            fuse_session_process(mt->se, w->buf, res, ch);

            pthread_mutex_lock(&mt->lock);
            if (!isforget)
                  mt->numavail++;
            if (mt->numavail > 10) {
                  if (mt->exit) {
                        pthread_mutex_unlock(&mt->lock);
                        return NULL;
                  }
                  list_del_worker(w);
                  mt->numavail--;
                  mt->numworker--;
                  pthread_mutex_unlock(&mt->lock);

                  pthread_detach(w->thread_id);
                  free(w->buf);
                  free(w);
                  return NULL;
            }
            pthread_mutex_unlock(&mt->lock);
      }

      sem_post(&mt->finish);
      pause();

      return NULL;
}

static int fuse_start_thread(struct fuse_mt *mt)
{
      sigset_t oldset;
      sigset_t newset;
      int res;
      struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
      if (!w) {
            fprintf(stderr, "fuse: failed to allocate worker structure\n");
            return -1;
      }
      memset(w, 0, sizeof(struct fuse_worker));
      w->bufsize = fuse_chan_bufsize(mt->prevch);
      w->buf = malloc(w->bufsize);
      w->mt = mt;
      if (!w->buf) {
            fprintf(stderr, "fuse: failed to allocate read buffer\n");
            free(w);
            return -1;
      }

      /* Disallow signal reception in worker threads */
      sigemptyset(&newset);
      sigaddset(&newset, SIGTERM);
      sigaddset(&newset, SIGINT);
      sigaddset(&newset, SIGHUP);
      sigaddset(&newset, SIGQUIT);
      pthread_sigmask(SIG_BLOCK, &newset, &oldset);
      res = pthread_create(&w->thread_id, NULL, fuse_do_work, w);
      pthread_sigmask(SIG_SETMASK, &oldset, NULL);
      if (res != 0) {
            fprintf(stderr, "fuse: error creating thread: %s\n",
                  strerror(res));
            free(w->buf);
            free(w);
            return -1;
      }
      list_add_worker(w, &mt->main);
      mt->numavail ++;
      mt->numworker ++;

      return 0;
}

static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
{
      pthread_join(w->thread_id, NULL);
      pthread_mutex_lock(&mt->lock);
      list_del_worker(w);
      pthread_mutex_unlock(&mt->lock);
      free(w->buf);
      free(w);
}

00182 int fuse_session_loop_mt(struct fuse_session *se)
{
      int err;
      struct fuse_mt mt;
      struct fuse_worker *w;

      memset(&mt, 0, sizeof(struct fuse_mt));
      mt.se = se;
      mt.prevch = fuse_session_next_chan(se, NULL);
      mt.error = 0;
      mt.numworker = 0;
      mt.numavail = 0;
      mt.main.thread_id = pthread_self();
      mt.main.prev = mt.main.next = &mt.main;
      sem_init(&mt.finish, 0, 0);
      fuse_mutex_init(&mt.lock);

      pthread_mutex_lock(&mt.lock);
      err = fuse_start_thread(&mt);
      pthread_mutex_unlock(&mt.lock);
      if (!err) {
            /* sem_wait() is interruptible */
            while (!fuse_session_exited(se))
                  sem_wait(&mt.finish);

            for (w = mt.main.next; w != &mt.main; w = w->next)
                  pthread_cancel(w->thread_id);
            mt.exit = 1;
            pthread_mutex_unlock(&mt.lock);

            while (mt.main.next != &mt.main)
                  fuse_join_worker(&mt, mt.main.next);

            err = mt.error;
      }

      pthread_mutex_destroy(&mt.lock);
      sem_destroy(&mt.finish);
      fuse_session_reset(se);
      return err;
}

Generated by  Doxygen 1.6.0   Back to index