Logo Search packages:      
Sourcecode: maqview version File versions  Download package

socket_view.c

#include "socket_view.h"

void custom_socket(int sock){
      int yes, no;
      yes = 1;
      no  = 0;
      setsockopt(sock, SOL_SOCKET, SO_DEBUG, &yes, sizeof(int));
      setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(int));
}

int read_full(int fds, void *buffer, size_t size){
      int num, count;
      count = 0;
      while(1){
            num = read(fds, buffer + count, size - count);
            if(num < 0){
                  break;
            } else {
                  count += num;
                  if(count == size) break;
            }
      }
      return count;
}

#define my_write(fds, val) if(write(fds, &(val), sizeof(val)) != sizeof(val)) return -1
#define my_write_full(fds, buffer, size) if(write(fds, buffer, size) != size) return -1

int request_view_basement_info(int sock){
      const unsigned char op = VIEW_OP_INFO;
      my_write(sock, op);
      return 1;
}

int send_view_basement_info(int sock, RefSeq *src){
      int i, size;
      my_write(sock, src->cache->n_ref);
      for(i=0;i<src->cache->n_ref;i++){
            my_write(sock, src->cache->ref_lengths[i]);
            size = strlen(src->cache->index->mm->ref_name[i]);
            my_write(sock, size);
            my_write_full(sock, src->cache->index->mm->ref_name[i], size);
      }
      return 1;
}

#define my_read(fds, val) if(read_full(fds, &(val), sizeof(val)) != sizeof(val)) return NULL
#define my_read_full(fds, buffer, size) if(read_full(fds, buffer, size) != size) return NULL

#define my_read2(fds, val) if(read_full(fds, &(val), sizeof(val)) != sizeof(val)) return -1
#define my_read_full2(fds, buffer, size) if(read_full(fds, buffer, size) != size) return -1

ViewInfo* recv_view_basement_info(int sock){
      ViewInfo *info;
      int i, len;
      info = (ViewInfo*)malloc(sizeof(ViewInfo));
      my_read(sock, info->n_ref);
      info->ref_names   = (char**)malloc(sizeof(char*) * info->n_ref);
      info->ref_lengths = (int64_t*)malloc(sizeof(int64_t) * info->n_ref);
      for(i=0;i<info->n_ref;i++){
            my_read(sock, info->ref_lengths[i]);
            my_read(sock, len);
            info->ref_names[i] = (char*)malloc(len + 1);
            my_read_full(sock, info->ref_names[i], len);
            info->ref_names[i][len] = 0;
      }
      info->last_code = 0;
      info->ref_id    = 0;
      info->start     = 0;
      info->end       = -1;

      info->rd_size   = 0;
      info->rd_cap    = 1024;
      info->reads     = (Read*)malloc(sizeof(Read) * info->rd_cap);
      info->cns_size   = 0;
      info->cns_cap    = 1024;
      info->seqs       = (cns_t*)malloc(sizeof(cns_t) * info->cns_cap);
      return info;
}

int request_maq_data(int sock, int last_code, int ref_id, int64_t start, int64_t end){
      const unsigned char op = VIEW_OP_FETCH;
      my_write(sock, op);
      my_write(sock, last_code);
      my_write(sock, ref_id);
      my_write(sock, start);
      my_write(sock, end);
      return 1;
}

int send_maq_data(int sock, RefSeq *refs, int code, int64_t start, int64_t end){
      int s, e, size, i;
      int64_t pos, m;
      my_write(sock, code);
      my_write(sock, refs->cache->ref_id);
      i = 0;
      while(i<refs->cache->size-1 && read_pos(refs->cache->reads[i].read.pos) < start){ i++; }
      s = i;
      while(i<refs->cache->size-2 && read_pos(refs->cache->reads[i+1].read.pos) <= end) i++;
      e = i;
      size = e - s + 1;
      if(size > 0){
            pos = read_pos(refs->cache->reads[s].read.pos);
            my_write(sock, pos);
            pos = read_pos(refs->cache->reads[e].read.pos);
            my_write(sock, pos);
            my_write(sock, size);
            my_write_full(sock, refs->cache->reads + refs->cache->offset + s, sizeof(Read) * size);
            m = start - refs->cns->start;
            size = refs->cns->size - m;
            my_write(sock, size);
            my_write_full(sock, refs->cns->seqs + refs->cns->offset + m, sizeof(cns_t) * (size));
      } else {
            size = 0;
            my_write(sock, start);
            my_write(sock, start);
            my_write(sock, size);
            my_write(sock, size);
      }
      return 1;
}

int recv_maq_data(int sock, ViewInfo *info){
      my_read2(sock, info->last_code);
      my_read2(sock, info->ref_id);
      my_read2(sock, info->start);
      my_read2(sock, info->end);
      my_read2(sock, info->rd_size);
      if(info->rd_size >= info->rd_cap){
            info->rd_cap = info->rd_size;
            info->reads  = (Read*)realloc(info->reads, sizeof(Read) * info->rd_cap);
      }
      if(info->rd_size) my_read_full2(sock, info->reads, sizeof(Read) * info->rd_size);
      my_read2(sock, info->cns_size);
      if(info->cns_size >= info->cns_cap){
            info->cns_cap = info->cns_size;
            info->seqs    = (cns_t*)realloc(info->seqs, sizeof(cns_t) * info->cns_cap);
      }
      if(info->cns_size) my_read_full2(sock, info->seqs, sizeof(cns_t) * info->cns_size);
      return 1;
}

ViewServer* createViewServer(const char *map_file, const char *cns_file, int listen_port, int max_conns){
      ViewServer *server;
      MapIndex *index;
      RefSeq *refs;
      int i, j;
      struct sockaddr_in name;
      if(max_conns < 1) return NULL;
      server = (ViewServer*)malloc(sizeof(ViewServer));
      server->v_max  = max_conns;
      server->srcs   = (RefSeq**)malloc(sizeof(RefSeq*) * server->v_max);
      for(i=0;i<server->v_max;i++){
            index = load_map_index(map_file, 0);
            if(index == NULL){ return NULL; }
            refs = (RefSeq*)malloc(sizeof(RefSeq));
            refs->cache = read_cache_init(index);
            refs->show_id    = 0;
            refs->show_start = 0;
            refs->tooltips   = NULL;
            refs->cns = NULL;
            if(cns_file) refs->cns = open_cns_cache(cns_file);
            if(refs->cns == NULL) refs->cns = cns_cache_init();
            else {
                  justify_cns_cache(refs->cns, refs->cache->index->mm->ref_name, refs->cache->index->mm->n_ref);
                  for(j=0;j<refs->cache->index->mm->n_ref;j++){
                        refs->cache->ref_lengths[j] = refs->cns->ref_lengths[j];
                        if(refs->cache->index->trees[j]->right < refs->cns->ref_lengths[j]){
                              refs->cache->index->trees[j]->right = refs->cns->ref_lengths[j];
                        }
                  }
            }
            server->srcs[i] = refs;
      }
      server->socket   = -1;

      server->socket = socket(PF_INET, SOCK_STREAM, 0);
      if(server->socket < 0){
            perror("socket");
            freeViewServer(server);
            return NULL;
      }
      name.sin_family = AF_INET;
      name.sin_port = htons(listen_port);
      name.sin_addr.s_addr = htonl(INADDR_ANY);
      if(bind(server->socket, (struct sockaddr *)&name, sizeof(name)) < 0){
            perror("bind");
            freeViewServer(server);
            return NULL;
      }
      server->stop  = 1;
      server->debug = 0;
      return server;
}

int service_core(ViewServer *server, int sock){
      unsigned char op;
      int i, ref_id, idx, last_idx, ret;
      int64_t start, end;
      RefSeq *refs;
      switch(read(sock, &op, 1)){
            case 1: break;
            case 0: 
            default:
                  return -1;
      }
      switch(op){
            case VIEW_OP_CLOSE:
                  if(server->debug){
                        fprintf(stderr, " -- Server recv VIEW_OP_CLOSE in %s -- %s:%d --\n", __FUNCTION__, __FILE__, __LINE__);
                  }
                  return -1;
            case VIEW_OP_INFO:
                  if(server->debug){
                        fprintf(stderr, " -- Server recv VIEW_OP_INFO in %s -- %s:%d --\n", __FUNCTION__, __FILE__, __LINE__);
                  }
                  return send_view_basement_info(sock, server->srcs[0]);
            case VIEW_OP_FETCH:
                  if(server->debug){
                        fprintf(stderr, " -- Server recv VIEW_OP_FETCH in %s -- %s:%d --\n", __FUNCTION__, __FILE__, __LINE__);
                  }
                  my_read2(sock, last_idx);
                  my_read2(sock, ref_id);
                  my_read2(sock, start);
                  my_read2(sock, end);
                  if(last_idx < 0 || last_idx >= server->v_max) return -1;
                  refs = server->srcs[last_idx];
                  idx = last_idx;
                  for(i=0;i<server->v_max;i++){
                        if(i == last_idx) continue;
                        if(server->srcs[i]->show_id == ref_id){
                              if(refs->show_id != ref_id){
                                    refs = server->srcs[i];
                                    idx = i;
                              } else {
                                    if(server->srcs[i]->show_start >= start && server->srcs[i]->show_start <= start + 32 * 1024){
                                          if(refs->show_start >= start && refs->show_start <= start + 32 * 1024){
                                                if(server->srcs[i]->show_start < refs->show_start){
                                                      refs = server->srcs[i];
                                                      idx = i;
                                                }
                                          } else {
                                                refs = server->srcs[i];
                                                idx = i;
                                          }
                                    }
                              }
                        }
                  }
                  ret = read_cache_put(refs->cache, ref_id, start, end);
                  if(ret < 0) return -1;
                  refs->show_id = ref_id;
                  refs->show_start = start;
                  if(refs->cns->stream && ref_id >= 0 && ref_id < refs->cns->n_mm_ref && refs->cns->mm_map[ref_id] >= 0){
                        cns_cache_put(refs->cns, refs->cns->mm_map[ref_id], start, end);
                  } else {
                        cns_cache_guess(refs->cns, refs->cache);
                  }
                  if(server->debug){
                        fprintf(stderr, " -- Server fetch %d %ld %ld in %s -- %s:%d --\n", ref_id, start, end, __FUNCTION__, __FILE__, __LINE__);
                  }
                  return send_maq_data(sock, refs, idx, start, end);
            default:
                  return 0;
      }
}

int runViewServer(ViewServer *server){
      int i, sock, state;
      size_t size;
      fd_set active_fd_set, read_fd_set;
      struct timeval timeout;
      struct sockaddr_in clientname;
      if(listen(server->socket, server->v_max) < 0){
            perror("listen");
            return -1;
      }
      FD_ZERO(&active_fd_set);
      FD_SET(server->socket, &active_fd_set);
      server->stop = 0;
      while(!server->stop){
            read_fd_set = active_fd_set;
            timeout.tv_sec  = 5;
            timeout.tv_usec = 0;
            state = select(FD_SETSIZE, &read_fd_set, NULL, NULL, &timeout);
            if(state != 1) continue;
            for(i=0;i<FD_SETSIZE;i++){
                  if(FD_ISSET(i, &read_fd_set)){
                        if(i == server->socket){
                              size = sizeof(clientname);
                              sock = accept(server->socket, (struct sockaddr *)&clientname, &size);
                              if(sock < 0){
                                    perror("accept");
                                    continue;
                              }
                              if(server->debug){
                                    fprintf(stdout, "Server: connect from host %s, port %hd client_id(%d).\n",
                                           inet_ntoa(clientname.sin_addr),
                                           ntohs(clientname.sin_port), sock);
                              }
                              custom_socket(sock);
                              FD_SET(sock, &active_fd_set);
                        } else {
                              switch(service_core(server, i)){
                                    case VIEW_RES_ERROR:
                                          close(i);
                                          if(server->debug){
                                                fprintf(stderr, " -- Close client(%d) in %s -- %s:%d --\n", i, __FUNCTION__, __FILE__, __LINE__);
                                          }
                                          FD_CLR(i, &active_fd_set);
                                          break;
                              }
                        }
                  }
            }
      }
      for(i=0;i<FD_SETSIZE;i++){
            if(i == server->socket) continue;
            if(!FD_ISSET(i, &active_fd_set)) continue;
            close(i);
      }
      return 0;
}

void stopViewServer(ViewServer *server){
      server->stop = 1;
}

void freeViewServer(ViewServer *server){
      int i;
      if(server->socket >= 0) close(server->socket);
      for(i=0;i<server->v_max;i++){
            read_cache_free(server->srcs[i]->cache);
            close_cns_cache(server->srcs[i]->cns);
            free(server->srcs[i]);
      }
      free(server->srcs);
      free(server);
}

void freeViewInfo(ViewInfo *info){
      int i;
      free(info->ref_lengths);
      free(info->reads);
      free(info->seqs);
      for(i=0;i<info->n_ref;i++){
            free(info->ref_names[i]);
      }
      free(info->ref_names);
      free(info);
}

void freeViewClient(ViewClient *client){
      if(client->info) freeViewInfo(client->info);
      free(client);
}

ViewClient* connectViewClient(char *host, int port){
      ViewClient *client;
      int sock;
      struct sockaddr_in name;
      struct hostent *hostinfo;
      sock = socket(PF_INET, SOCK_STREAM, 0);
      if(sock < 0){
            perror("socket");
            return NULL;
      }
      custom_socket(sock);
      name.sin_family = AF_INET;
      name.sin_port = htons (port);
      hostinfo = gethostbyname (host);
      if (hostinfo == NULL){
            fprintf (stderr, "Unknown host %s:%d.\n", host, port);
            return NULL;
      }
      name.sin_addr = *(struct in_addr *) hostinfo->h_addr;
      if(connect(sock, (struct sockaddr *) &name, sizeof(name)) < 0){
            perror("connect");
            return NULL;
      }
      client = (ViewClient*)malloc(sizeof(ViewClient));
      client->socket = sock;
      if(request_view_basement_info(client->socket) == -1){
            free(client);
            perror("write");
            return NULL;
      }
      client->info = recv_view_basement_info(client->socket);
      if(client->info == NULL){
            free(client);
            return NULL;
      }
      return client;
}

int fetch_maq_data(ViewClient *client, int ref_id, int64_t start, int64_t end){
      if(request_maq_data(client->socket, client->info->last_code, ref_id, start, end) == VIEW_RES_ERROR) return VIEW_RES_ERROR;
      if(recv_maq_data(client->socket, client->info) == VIEW_RES_ERROR) return VIEW_RES_ERROR;
      return 1;
}

void closeViewClient(ViewClient *client){
      const unsigned char op = VIEW_OP_CLOSE;
      if(client->socket >= 0){
            write(client->socket, &op, sizeof(op));
            close(client->socket);
      }
      freeViewClient(client);
}

void socketview_usage(char *prog){
      printf(
      "Setup Server: %s -s  <port> <map file> [<cns file> [<width> [<height>]]]\n"
      "Run Client  : %s -c  <host> <port> [<ref_id> <start> <end>]\n", prog, prog);
      exit(0);
}

int print_map(char* ref_name, maplet *m1){
      int j;
      FILE *fpout;
      fpout = stdout;
      fprintf(fpout, "%s\t%s\t%d\t%c\t%d\t%u\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t",
                  m1->name, ref_name, (m1->pos>>1) + 1,
                  (m1->pos&1)? '-' : '+', m1->dist, m1->flag, m1->map_qual, (signed char)m1->seq[MAX_READLEN-1],
                  m1->alt_qual, m1->info1&0xf, m1->info2, m1->c[0], m1->c[1], m1->size);
      for (j = 0; j != m1->size; ++j) {
            if (m1->seq[j] == 0) fputc('n', fpout);
            else if ((m1->seq[j]&0x3f) < 27) fputc("acgt"[m1->seq[j]>>6&3], fpout);
            else fputc("ACGT"[m1->seq[j]>>6&3], fpout);
      }
      fputc('\t', fpout);
      for (j = 0; j != m1->size; ++j)
            fputc((m1->seq[j]&0x3f) + 33, fpout);
      fputc('\n', fpout);
      return 1;
}

int main(int argc, char **argv){
      ViewServer *server;
      ViewClient *client;
      int i, c;
      int ref_id;
      int64_t start, end;
      char *host, *map_file, *cns_file;
      int port;
      c = 0;
      c = getopt(argc, argv, "sc");
      if(c == -1) socketview_usage(argv[0]);
      host = map_file = cns_file = NULL;
      if(c == 's'){
            if(argc > optind) port = atoi(argv[optind++]);
            else socketview_usage(argv[0]);
            if(argc > optind) map_file = argv[optind++];
            else socketview_usage(argv[0]);
            if(argc > optind) cns_file = argv[optind++];
            else cns_file = NULL;
            server = createViewServer(map_file, cns_file, port, 6);
            server->debug = 1;
            if(server == NULL){
                  printf("Cannot create View\n");
                  return 1;
            }
            runViewServer(server);
            exit(0);
      } else if(c == 'c'){
            if(argc > optind) host = argv[optind++];
            else socketview_usage(argv[0]);
            if(argc > optind) port = atoi(argv[optind++]);
            else socketview_usage(argv[0]);
            if(argc > optind) ref_id = atoi(argv[optind++]);
            else ref_id = -1;
            if(argc > optind) start = atol(argv[optind++]) - 1;
            else start = 0;
            if(argc > optind) end   = atol(argv[optind++]) - 1;
            else end   = 0;
            client = connectViewClient(host, port);
            if(client == NULL){
                  fprintf(stderr, "Cannot connect to %s:%d\n", host, port);
                  return 1;
            }
            if(ref_id == -1){
                  printf("Number of references: %d\n", client->info->n_ref);
                  for(i=0;i<client->info->n_ref;i++){
                        printf("%d\t%s\t%ld\n", i, client->info->ref_names[i], client->info->ref_lengths[i]);
                  }
            } else {
                  if(fetch_maq_data(client, ref_id, start, end) == -1){
                        fprintf(stderr, "Error in fetching maq data\n");
                  } else {
                        for(i=0;i<client->info->cns_size;i++){
                              printf("%c", cns_get_ref(client->info->seqs[i]));
                        }
                        printf("\n");
                        for(i=0;i<client->info->cns_size;i++){
                              printf("%c", cns_get_cns(client->info->seqs[i]));
                        }
                        printf("\n");
                        for(i=0;i<client->info->rd_size;i++){
                              print_map(client->info->ref_names[client->info->ref_id], &(client->info->reads[i].read));
                        }
                  }
            }
            closeViewClient(client);
      } else {
            printf("Unknown option %c\n", c);
            socketview_usage(argv[0]);
      }
      return 0;
}


Generated by  Doxygen 1.6.0   Back to index