aboutsummaryrefslogtreecommitdiffstats
path: root/complaints.c
diff options
context:
space:
mode:
authorGuillaume Horel <guillaume.horel@serenitascapital.com>2015-11-04 13:55:26 -0500
committerGuillaume Horel <guillaume.horel@serenitascapital.com>2015-11-04 13:55:26 -0500
commitded46d4768498c7d27fedcc438fe80a59ad63d0c (patch)
tree1158247ec3b9580a3deaa320334c2d777050b6b9 /complaints.c
parenta5309fed914fdaa7697f2d369e7dcd02309063ab (diff)
downloadmrsync-ded46d4768498c7d27fedcc438fe80a59ad63d0c.tar.gz
move code into a src directory
Diffstat (limited to 'complaints.c')
-rw-r--r--complaints.c579
1 files changed, 0 insertions, 579 deletions
diff --git a/complaints.c b/complaints.c
deleted file mode 100644
index fb48da2..0000000
--- a/complaints.c
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- Copyright (C) 2005-2008 Renaissance Technologies Corp.
- main developer: HP Wei <hp@rentec.com>
- Copyright (C) 2001 Renaissance Technologies Corp.
- main developer: HP Wei <hp@rentec.com>
- This file was modified in 2001 and later from files in the program
- multicaster copyrighted by Aaron Hillegass as found at
- <http://sourceforge.net/projects/multicaster/>
-
- Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2, or (at your option)
- any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; see the file COPYING.
- If not, write to the Free Software Foundation,
- 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-*/
-
-#include "main.h"
-/*#include "paths.h"*/
-#include <sys/types.h>
-#include <time.h>
-#include <setjmp.h>
-#include <signal.h>
-
-extern int monitorID; /* defined in multicaster.c */
-extern int my_FLOW_PORT;
-extern int verbose;
-extern int nPages;
-extern char * cmd_name[];
-extern unsigned int total_pages, real_total_pages;
-extern off_t total_bytes, real_total_bytes;
-
-/* buffer for receiving complaints */
-char flow_buff[FLOW_BUFFSIZE];
-int *code_ptr; /* What's wrong? */
-int *mid_ptr; /* machine id */
-int *file_ptr; /* which file */
-int *npage_ptr; /* # of pages */
-int *pArray_ptr;/* missing page arrary */
-
-/* receive socket */
-int complaint_fd;
-#ifndef IPV6
-struct sockaddr_in complaint_addr;
-#else
-struct sockaddr_in6 complaint_addr;
-#endif
-
-/* status */
-char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */
-int *last_seq; /* array of size nMachines -- seq number of complaints */
- /* *** watch out the max seq = 2e9 */
-int *total_missing_page; /* array of size nMachines -- persistent thru life of program*/
-char *file_received; /* array of size nMachines */
-char *bad_machines; /* array of size nMachines -- persistent thru life of program*/
-char *machine_status; /* array of size nMachines for ack */
-int *missing_pages; /* array of size nMachines */
-
-int nMachines;
-int has_missing; /* some machines have missing pages for this file (a flag)*/
-int has_sick; /* some machines are sick for this file (a flag)*/
-int skip_count=0; /* number of files that are not delivered */
-int quitWithOneBad=FALSE; /* default: continue with one or more (<all) bad machine */
-
-/* flow control */
-int my_ACK_WAIT_TIMES = ACK_WAIT_TIMES;
-
-/*
- init_complaints initializes our buffers to receive complaint information
- from the catchers
-*/
-void init_complaints()
-{
- int rcv_size;
-
- if (verbose>=2)
- fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE);
-
- /* get pointers set to the right place in buffer */
- code_ptr = (int *) flow_buff;
- mid_ptr = (int *) (code_ptr + 1);
- file_ptr = (int *) (mid_ptr + 1);
- npage_ptr = (int *) (file_ptr + 1);
- pArray_ptr= (int *) (npage_ptr+1);
-
- /* Receive socket (the default buffer size is 65535 bytes */
- if (verbose>=2) printf("set up receive socket for complaints\n");
- complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT);
-
- /*
- getsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &i, &il);
- printf(" rcvbuf = %d type = %d\n", i, il);
- exit(0);
- the default in our machines -> size = 65535 and type = 4
- */
-
- rcv_size = TOTAL_REC_PAGE * FLOW_BUFFSIZE;
- if (setsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){
- perror("Expanding receive buffer for init_complaints");
- }
-}
-
-void init_missing_page_flag(int n)
-{
- int i;
- nPages = n;
- if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) {
- fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n);
- perror("error = ");
- exit(0);
- }
- for(i=0; i<nPages; ++i) {
- missing_page_flag[i] = RECEIVED;
- }
-}
-
-void page_sent(int page)
-{
- missing_page_flag[page] = RECEIVED;
-}
-
-void free_missing_page_flag()
-{
- free(missing_page_flag);
- missing_page_flag = NULL;
-}
-
-void set_has_missing()
-{
- has_missing = TRUE;
-}
-
-void reset_has_missing()
-{
- has_missing = FALSE;
-}
-
-void set_has_sick()
-{
- has_sick = TRUE;
-}
-
-void reset_has_sick()
-{
- has_sick = FALSE;
-}
-
-int has_sick_machines()
-{
- return has_sick;
-}
-
-int has_missing_pages()
-{ /* originally, this fx use missing_page_flag[]
- That will not work if the master did not receive missing page info */
- return has_missing;
-}
-
-void refresh_missing_pages()
-{
- int i;
- for(i=0; i<nMachines; ++i) missing_pages[i] = 0;
-}
-
-void refresh_machine_status()
-{
- int i;
- for(i=0; i<nMachines; ++i) machine_status[i] = NOT_READY;
-}
-
-void mod_machine_status()
-{
- /* take into account of machines which have received this file */
- int i;
- for(i=0; i<nMachines; ++i) {
- if (file_received[i]==FILE_RECV) machine_status[i]=MACHINE_OK;
- }
-}
-
-void refresh_file_received()
-{
- int i;
- for(i=0; i<nMachines; ++i) file_received[i] = NOT_RECV;
-}
-
-int nNotRecv()
-{
- int i, c;
- c = 0;
- for(i=0; i<nMachines; ++i) {
- if (file_received[i] == NOT_RECV) ++c;
- }
- return c;
-}
-
-int iNotRecv()
-{
- /* find the first NotRecv machine */
- int i;
- for(i=0; i<nMachines; ++i) {
- if (file_received[i] == NOT_RECV) return i;
- }
- return -1;
-}
-
-void init_machine_status(int n)
-{
- int i;
- nMachines = n;
- machine_status = malloc(n * sizeof(char));
- refresh_machine_status();
-
- bad_machines = malloc(n * sizeof(char));
- for(i=0; i<nMachines; ++i) {
- bad_machines[i] = GOOD_MACHINE;
- }
-
- total_missing_page = malloc(n * sizeof(int));
- for(i=0; i<nMachines; ++i) {
- total_missing_page[i] = 0;
- }
-
- missing_pages = malloc(n * sizeof(int));
- for(i=0; i<nMachines; ++i) {
- missing_pages[i] = 0;
- }
-
- file_received = malloc(n * sizeof(char));
- refresh_file_received();
-
- skip_count = 0;
-
- last_seq = malloc(n * sizeof(int));
- for(i=0; i<nMachines; ++i) {
- last_seq[i] = -1;
- }
-}
-
-int get_total_missing_pages(int n)
-{
- return total_missing_page[n];
-}
-
-int read_handle_complaint(int cmd)
-{
- /*
- cmd = cmd_code -- each cmd expects different 'response' (complaints)
- return 1 for complaint handled
- return 0 for irrelevant complaint
- return -1 for time-out
- */
- int mid_v, code_v, file_v, npage_v, bytes_read;
-
- if (readable(complaint_fd)) {
- /* There is a complaint */
- bytes_read = recvfrom(complaint_fd, flow_buff, FLOW_BUFFSIZE, 0, NULL, NULL);
-
- /* 20060323 deal with big- vs little-endian issue
- convert incoming integers into host representation */
-
- mid_v = ntohl(*mid_ptr);
-
- if ((bytes_read < FLOW_HEAD_SIZE) || (mid_v < 0 ) ||
- (mid_v >= (unsigned int) nMachines) || /* boundary check for mid_v for safety */
- (bad_machines[mid_v] == BAD_MACHINE)) { /* ignore complaint from a bad machine*/
- return 0;
- }
-
- code_v = ntohl(*code_ptr);
- file_v = ntohl(*file_ptr);
- npage_v = ntohl(*npage_ptr);
-
- /* check if the complaint is for the current file */
- if (code_v != MONITOR_OK && file_v != current_entry()) return 0;
- /* out of seq will be ignored */
- if (code_v != MISSING_PAGE && code_v != MISSING_TOTAL) { /********* MISSING_TOTAL ? *************/
- if (npage_v <= last_seq[mid_v]) return 0;
- else last_seq[mid_v] = npage_v;
- }
-
- switch (code_v) {
- case PAGE_RECV:
- /******** check if machineID is the one we have set. */
- /*if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);*/
- if (cmd == SENDING_DATA && mid_v == monitorID)
- return 1;
- else
- return 0;
-
- case MONITOR_OK:
- /********* check if machineID is the one we have set. */
- if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);
- if (cmd == SELECT_MONITOR_CMD && mid_v == monitorID)
- return 1;
- else
- return 0;
-
- case OPEN_OK :
- if (cmd == OPEN_FILE_CMD) {
- machine_status[mid_v] = MACHINE_OK;
- return 1;
- } else {
- return 0;
- }
-
- case CLOSE_OK :
- if (cmd == CLOSE_FILE_CMD || cmd == CLOSE_ABORT_CMD) {
- machine_status[mid_v] = MACHINE_OK;
- return 1;
- } else {
- return 0;
- }
-
- case EOF_OK :
- if (cmd == EOF_CMD && file_received[mid_v]==NOT_RECV) {
- machine_status[mid_v] = MACHINE_OK;
- file_received[mid_v] = FILE_RECV;
- return 1;
- } else {
- return 0;
- }
-
- case MISSING_PAGE :
- if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0;
- if (npage_v > nPages) return 0;
- {
- int i, *pi, page_v;
- pi = pArray_ptr;
- for (i = 0; i<npage_v; ++i) {
- page_v = ntohl(pi[i]);
- if (page_v<1 || page_v > nPages) continue; /*** make sure page_v starts with 1*/
- missing_page_flag[page_v-1] = MISSING;
- }
- }
- missing_pages[mid_v] += npage_v;
- set_has_missing();
- return 1;
-
- case MISSING_TOTAL:
- if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV || machine_status[mid_v] == MACHINE_OK)
- return 0;
- /* Consider to add: if npage_v >missing_pages[mid_v], ask to resend
- [ likely no big gain ] */
- total_missing_page[mid_v] += npage_v;
- set_has_missing(); /* store the info about missing info */
- machine_status[mid_v] = MACHINE_OK; /* machine_status serves as ack only */
- return 1;
-
- case SIT_OUT :
- if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0;
- fprintf(stderr, "*** %s sits-out-receiving %s\n",
- id2name(mid_v), getFilename());
- machine_status[mid_v] = MACHINE_OK;
-
- if (!has_sick) ++skip_count;
- set_has_sick();
- return 1;
-
- default :
- if (verbose>=2) fprintf(stderr, "Unknown complaint: code = %d\n", code_v);
- return 0;
- } /* end of switch */
- } /* end of if(readable) */
-
- /* time out of readable() */
- return -1;
-}
-
-int all_machine_ok()
-{
- int i;
- for(i=0; i<nMachines; ++i) {
- if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE)
- return FALSE; /* there is at least one machine that has not sent ack */
- }
- return TRUE; /* all machines are ready */
-}
-
-/* this is for the master to receive the acknowledgement. */
-void wait_for_ok(int code)
-{
- int i, count, resp;
- time_t tloc;
- time_t rtime0, rtime1;
-
- rtime0 = time(&tloc); /* reference time */
-
- count = 0;
- while (!all_machine_ok()) {
- resp = read_handle_complaint(code);
- if (resp==1) { /* if there is a complaint handled */
- rtime0 = time(&tloc); /* reset the reference time */
- continue;
- }
-
- if (resp==0) { /* irrelevant complaint received */
- continue;
- }
-
- /* no complaints handled within the time period set by set_delay() */
- rtime1 = time(&tloc); /* time since last complaints */
- if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) {
- ++count;
- if (count < my_ACK_WAIT_TIMES) {
- if (verbose>=1 && (count % 10 == 0))
- fprintf(stderr, " %d: resend cmd(%s) to machines:[ ", count, cmd_name[code]);
- for(i=0; i<nMachines; ++i) {
- if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) {
- if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "%d ", i);
- send_cmd(code, (int) i);
- usleep(FAST);
- }
- }
- if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "]\n");
- rtime0 = rtime1;
- } else { /* allowable period of time has passed */
- fprintf(stderr, " Drop these bad machines:[ ");
- for(i=0; i<nMachines; ++i) {
- if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) {
- /* fprintf(stderr, "%d ", i); */
- fprintf(stderr, "%s ", id2name(i));
- bad_machines[i]= BAD_MACHINE;
- /*
- The pages sent by the master will be ignored by
- the bad machine, because the current_file nubmer
- does not match.
- */
- }
- }
- fprintf(stderr, "]\n");
- break;
- }
- }
- }
-}
-
-int is_it_missing(int page)
-{
- return (missing_page_flag[page]==MISSING) ? TRUE : FALSE;
-}
-
-int pr_missing_pages()
-{
- int i, N, exit_code=0;
- off_t delta;
- unsigned int dp;
-
- for(i=0; i<nMachines; ++i) {
- char name[PATH_MAX];
- N = get_total_missing_pages(i);
- strcpy(name, id2name(i));
- if (strlen(name)==0) sprintf(name, "machine(%3d)", i);
- fprintf(stderr, "%s: #_missing_page_request = %6.2f%% = %d\n",
- name, (double)N/((double)total_pages)*100.0, N);
- }
-
- if (skip_count>0) {
- fprintf(stderr, "\nWarning: There are %d files which are not delivered.\n", skip_count);
- exit_code = -1;
- }
-
- fprintf(stderr, "\nTotal number of files = %12d Pages w/o ack = %12u (%6.2f%%)\n",
- total_entries(), pages_wo_ack(), (double)pages_wo_ack()/(double)real_total_pages*100.0);
-
- dp = real_total_pages - total_pages;
- fprintf(stderr, "Total number of pages = %12d Pages re-sent = %12u (%6.2f%%)\n",
- total_pages, dp, (double)dp/(double)total_pages*100.0);
-
- delta = (off_t)(real_total_bytes - total_bytes);
- #ifdef _LARGEFILE_SOURCE
- fprintf(stderr, "Total number of bytes = %12llu Bytes re-sent = %12llu (%6.2f%%)\n",
- total_bytes, delta, (double)delta/(double)total_bytes*100.0);
- #else
- fprintf(stderr, "Total number of bytes = %12d Bytes re-sent = %12u (%6.2f%%)\n",
- total_bytes, delta, (double)delta/(double)total_bytes*100.0);
- #endif
-
- return (exit_code);
-}
-
-/* count the number of bad machines */
-int nBadMachines()
-{
- int i, count = 0;
- for(i=0; i<nMachines; ++i) {
- if (bad_machines[i] == BAD_MACHINE) ++count;
- }
-
- return count;
-}
-
-int choose_print_machines(char *stArray, char selection, char * msg_prefix)
-{
- int i, count = 0;
-
- for(i=0; i<nMachines; ++i) {
- char line[PATH_MAX];
- if (stArray[i] == selection) {
- ++count;
- strcpy(line, id2name(i));
- if (count == 1) { fprintf(stderr, msg_prefix); }
- if (strlen(line)==0)
- fprintf(stderr, "%d ", i);
- else
- fprintf(stderr, "%s ", line);
- }
- }
- if (count > 0) fprintf(stderr, "]\n");
- return count;
-}
-
-int send_done_and_pr_msgs(double total_time, double t_page)
-{
- int exit_code1 =0;
- int exit_code2 =0;
- int exit_code3 =0;
-
- send_all_done_cmd();
-
- /* exit_code1 !=0 if there are files that were not delivered due to change or skipped */
- exit_code1 = pr_missing_pages();
-
- fprintf(stderr, "Total time spent = %6.2f (min) ~ %6.2f (min/GB)\n\n",
- total_time, total_time / ((double)real_total_bytes/1.0e9));
- fprintf(stderr, "Send pages time = %6.2f (min) ~ %6.2f (min/GB)\n\n",
- t_page, t_page / ((double)real_total_bytes/1.0e9));
-
- exit_code2 = choose_print_machines(bad_machines,
- BAD_MACHINE,
- "Not synced for bad machines:[ ");
-
- if (quitWithOneBad && nBadMachines() >=1) {
- fprintf(stderr, "We choose to exit when at least one target is bad\n");
- fprintf(stderr, "All files following the current one did not get delivered\n");
- fprintf(stderr, "If resend cmd(CLOSE_FILE), then the current file may have been delivered to non-bad targets\n\n");
- }
-
- if (current_entry() < total_entries()) { /* if we exit prematurely */
- exit_code3 = choose_print_machines(machine_status,
- NOT_READY, "\nNot-ready machines:[ ");
- }
-
- if (verbose>=1) pr_rtt_hist();
- return (exit_code1+exit_code3); /* 200807 removed exit_code2 because bad machines case has been dealt with
- by -q. If no -q, then the bad machines are considered 'harmless' */
-}
-
-/* to do some cleanup before exit IF all machines are bad */
-void do_badMachines_exit()
-{
- if ((quitWithOneBad && nBadMachines() < 1) ||
- (!quitWithOneBad && (nBadMachines() < nMachines))) return;
-
- if (quitWithOneBad)
- fprintf(stderr, "One (or more) machine is bad. Exit!\n");
- else
- fprintf(stderr, "All machines are bad. Exit!\n");
-
- send_done_and_pr_msgs(-1.0, -1.0);
- exit(-1);
-}
-
-void do_cntl_c(int signo)
-{
- fprintf(stderr, "Control_C interrupt detected!\n");
-
- send_done_and_pr_msgs(-1.0, -1.0);
- exit(-1);
-}