diff options
| author | Guillaume Horel <guillaume.horel@serenitascapital.com> | 2015-11-04 13:55:26 -0500 |
|---|---|---|
| committer | Guillaume Horel <guillaume.horel@serenitascapital.com> | 2015-11-04 13:55:26 -0500 |
| commit | ded46d4768498c7d27fedcc438fe80a59ad63d0c (patch) | |
| tree | 1158247ec3b9580a3deaa320334c2d777050b6b9 /complaints.c | |
| parent | a5309fed914fdaa7697f2d369e7dcd02309063ab (diff) | |
| download | mrsync-ded46d4768498c7d27fedcc438fe80a59ad63d0c.tar.gz | |
move code into a src directory
Diffstat (limited to 'complaints.c')
| -rw-r--r-- | complaints.c | 579 |
1 files changed, 0 insertions, 579 deletions
diff --git a/complaints.c b/complaints.c deleted file mode 100644 index fb48da2..0000000 --- a/complaints.c +++ /dev/null @@ -1,579 +0,0 @@ -/* - Copyright (C) 2005-2008 Renaissance Technologies Corp. - main developer: HP Wei <hp@rentec.com> - Copyright (C) 2001 Renaissance Technologies Corp. - main developer: HP Wei <hp@rentec.com> - This file was modified in 2001 and later from files in the program - multicaster copyrighted by Aaron Hillegass as found at - <http://sourceforge.net/projects/multicaster/> - - Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2, or (at your option) - any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; see the file COPYING. - If not, write to the Free Software Foundation, - 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -*/ - -#include "main.h" -/*#include "paths.h"*/ -#include <sys/types.h> -#include <time.h> -#include <setjmp.h> -#include <signal.h> - -extern int monitorID; /* defined in multicaster.c */ -extern int my_FLOW_PORT; -extern int verbose; -extern int nPages; -extern char * cmd_name[]; -extern unsigned int total_pages, real_total_pages; -extern off_t total_bytes, real_total_bytes; - -/* buffer for receiving complaints */ -char flow_buff[FLOW_BUFFSIZE]; -int *code_ptr; /* What's wrong? */ -int *mid_ptr; /* machine id */ -int *file_ptr; /* which file */ -int *npage_ptr; /* # of pages */ -int *pArray_ptr;/* missing page arrary */ - -/* receive socket */ -int complaint_fd; -#ifndef IPV6 -struct sockaddr_in complaint_addr; -#else -struct sockaddr_in6 complaint_addr; -#endif - -/* status */ -char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */ -int *last_seq; /* array of size nMachines -- seq number of complaints */ - /* *** watch out the max seq = 2e9 */ -int *total_missing_page; /* array of size nMachines -- persistent thru life of program*/ -char *file_received; /* array of size nMachines */ -char *bad_machines; /* array of size nMachines -- persistent thru life of program*/ -char *machine_status; /* array of size nMachines for ack */ -int *missing_pages; /* array of size nMachines */ - -int nMachines; -int has_missing; /* some machines have missing pages for this file (a flag)*/ -int has_sick; /* some machines are sick for this file (a flag)*/ -int skip_count=0; /* number of files that are not delivered */ -int quitWithOneBad=FALSE; /* default: continue with one or more (<all) bad machine */ - -/* flow control */ -int my_ACK_WAIT_TIMES = ACK_WAIT_TIMES; - -/* - init_complaints initializes our buffers to receive complaint information - from the catchers -*/ -void init_complaints() -{ - int rcv_size; - - if (verbose>=2) - fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE); - - /* get pointers set to the right place in buffer */ - code_ptr = (int *) flow_buff; - mid_ptr = (int *) (code_ptr + 1); - file_ptr = (int *) (mid_ptr + 1); - npage_ptr = (int *) (file_ptr + 1); - pArray_ptr= (int *) (npage_ptr+1); - - /* Receive socket (the default buffer size is 65535 bytes */ - if (verbose>=2) printf("set up receive socket for complaints\n"); - complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT); - - /* - getsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &i, &il); - printf(" rcvbuf = %d type = %d\n", i, il); - exit(0); - the default in our machines -> size = 65535 and type = 4 - */ - - rcv_size = TOTAL_REC_PAGE * FLOW_BUFFSIZE; - if (setsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ - perror("Expanding receive buffer for init_complaints"); - } -} - -void init_missing_page_flag(int n) -{ - int i; - nPages = n; - if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) { - fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n); - perror("error = "); - exit(0); - } - for(i=0; i<nPages; ++i) { - missing_page_flag[i] = RECEIVED; - } -} - -void page_sent(int page) -{ - missing_page_flag[page] = RECEIVED; -} - -void free_missing_page_flag() -{ - free(missing_page_flag); - missing_page_flag = NULL; -} - -void set_has_missing() -{ - has_missing = TRUE; -} - -void reset_has_missing() -{ - has_missing = FALSE; -} - -void set_has_sick() -{ - has_sick = TRUE; -} - -void reset_has_sick() -{ - has_sick = FALSE; -} - -int has_sick_machines() -{ - return has_sick; -} - -int has_missing_pages() -{ /* originally, this fx use missing_page_flag[] - That will not work if the master did not receive missing page info */ - return has_missing; -} - -void refresh_missing_pages() -{ - int i; - for(i=0; i<nMachines; ++i) missing_pages[i] = 0; -} - -void refresh_machine_status() -{ - int i; - for(i=0; i<nMachines; ++i) machine_status[i] = NOT_READY; -} - -void mod_machine_status() -{ - /* take into account of machines which have received this file */ - int i; - for(i=0; i<nMachines; ++i) { - if (file_received[i]==FILE_RECV) machine_status[i]=MACHINE_OK; - } -} - -void refresh_file_received() -{ - int i; - for(i=0; i<nMachines; ++i) file_received[i] = NOT_RECV; -} - -int nNotRecv() -{ - int i, c; - c = 0; - for(i=0; i<nMachines; ++i) { - if (file_received[i] == NOT_RECV) ++c; - } - return c; -} - -int iNotRecv() -{ - /* find the first NotRecv machine */ - int i; - for(i=0; i<nMachines; ++i) { - if (file_received[i] == NOT_RECV) return i; - } - return -1; -} - -void init_machine_status(int n) -{ - int i; - nMachines = n; - machine_status = malloc(n * sizeof(char)); - refresh_machine_status(); - - bad_machines = malloc(n * sizeof(char)); - for(i=0; i<nMachines; ++i) { - bad_machines[i] = GOOD_MACHINE; - } - - total_missing_page = malloc(n * sizeof(int)); - for(i=0; i<nMachines; ++i) { - total_missing_page[i] = 0; - } - - missing_pages = malloc(n * sizeof(int)); - for(i=0; i<nMachines; ++i) { - missing_pages[i] = 0; - } - - file_received = malloc(n * sizeof(char)); - refresh_file_received(); - - skip_count = 0; - - last_seq = malloc(n * sizeof(int)); - for(i=0; i<nMachines; ++i) { - last_seq[i] = -1; - } -} - -int get_total_missing_pages(int n) -{ - return total_missing_page[n]; -} - -int read_handle_complaint(int cmd) -{ - /* - cmd = cmd_code -- each cmd expects different 'response' (complaints) - return 1 for complaint handled - return 0 for irrelevant complaint - return -1 for time-out - */ - int mid_v, code_v, file_v, npage_v, bytes_read; - - if (readable(complaint_fd)) { - /* There is a complaint */ - bytes_read = recvfrom(complaint_fd, flow_buff, FLOW_BUFFSIZE, 0, NULL, NULL); - - /* 20060323 deal with big- vs little-endian issue - convert incoming integers into host representation */ - - mid_v = ntohl(*mid_ptr); - - if ((bytes_read < FLOW_HEAD_SIZE) || (mid_v < 0 ) || - (mid_v >= (unsigned int) nMachines) || /* boundary check for mid_v for safety */ - (bad_machines[mid_v] == BAD_MACHINE)) { /* ignore complaint from a bad machine*/ - return 0; - } - - code_v = ntohl(*code_ptr); - file_v = ntohl(*file_ptr); - npage_v = ntohl(*npage_ptr); - - /* check if the complaint is for the current file */ - if (code_v != MONITOR_OK && file_v != current_entry()) return 0; - /* out of seq will be ignored */ - if (code_v != MISSING_PAGE && code_v != MISSING_TOTAL) { /********* MISSING_TOTAL ? *************/ - if (npage_v <= last_seq[mid_v]) return 0; - else last_seq[mid_v] = npage_v; - } - - switch (code_v) { - case PAGE_RECV: - /******** check if machineID is the one we have set. */ - /*if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);*/ - if (cmd == SENDING_DATA && mid_v == monitorID) - return 1; - else - return 0; - - case MONITOR_OK: - /********* check if machineID is the one we have set. */ - if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID); - if (cmd == SELECT_MONITOR_CMD && mid_v == monitorID) - return 1; - else - return 0; - - case OPEN_OK : - if (cmd == OPEN_FILE_CMD) { - machine_status[mid_v] = MACHINE_OK; - return 1; - } else { - return 0; - } - - case CLOSE_OK : - if (cmd == CLOSE_FILE_CMD || cmd == CLOSE_ABORT_CMD) { - machine_status[mid_v] = MACHINE_OK; - return 1; - } else { - return 0; - } - - case EOF_OK : - if (cmd == EOF_CMD && file_received[mid_v]==NOT_RECV) { - machine_status[mid_v] = MACHINE_OK; - file_received[mid_v] = FILE_RECV; - return 1; - } else { - return 0; - } - - case MISSING_PAGE : - if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0; - if (npage_v > nPages) return 0; - { - int i, *pi, page_v; - pi = pArray_ptr; - for (i = 0; i<npage_v; ++i) { - page_v = ntohl(pi[i]); - if (page_v<1 || page_v > nPages) continue; /*** make sure page_v starts with 1*/ - missing_page_flag[page_v-1] = MISSING; - } - } - missing_pages[mid_v] += npage_v; - set_has_missing(); - return 1; - - case MISSING_TOTAL: - if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV || machine_status[mid_v] == MACHINE_OK) - return 0; - /* Consider to add: if npage_v >missing_pages[mid_v], ask to resend - [ likely no big gain ] */ - total_missing_page[mid_v] += npage_v; - set_has_missing(); /* store the info about missing info */ - machine_status[mid_v] = MACHINE_OK; /* machine_status serves as ack only */ - return 1; - - case SIT_OUT : - if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0; - fprintf(stderr, "*** %s sits-out-receiving %s\n", - id2name(mid_v), getFilename()); - machine_status[mid_v] = MACHINE_OK; - - if (!has_sick) ++skip_count; - set_has_sick(); - return 1; - - default : - if (verbose>=2) fprintf(stderr, "Unknown complaint: code = %d\n", code_v); - return 0; - } /* end of switch */ - } /* end of if(readable) */ - - /* time out of readable() */ - return -1; -} - -int all_machine_ok() -{ - int i; - for(i=0; i<nMachines; ++i) { - if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) - return FALSE; /* there is at least one machine that has not sent ack */ - } - return TRUE; /* all machines are ready */ -} - -/* this is for the master to receive the acknowledgement. */ -void wait_for_ok(int code) -{ - int i, count, resp; - time_t tloc; - time_t rtime0, rtime1; - - rtime0 = time(&tloc); /* reference time */ - - count = 0; - while (!all_machine_ok()) { - resp = read_handle_complaint(code); - if (resp==1) { /* if there is a complaint handled */ - rtime0 = time(&tloc); /* reset the reference time */ - continue; - } - - if (resp==0) { /* irrelevant complaint received */ - continue; - } - - /* no complaints handled within the time period set by set_delay() */ - rtime1 = time(&tloc); /* time since last complaints */ - if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) { - ++count; - if (count < my_ACK_WAIT_TIMES) { - if (verbose>=1 && (count % 10 == 0)) - fprintf(stderr, " %d: resend cmd(%s) to machines:[ ", count, cmd_name[code]); - for(i=0; i<nMachines; ++i) { - if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) { - if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "%d ", i); - send_cmd(code, (int) i); - usleep(FAST); - } - } - if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "]\n"); - rtime0 = rtime1; - } else { /* allowable period of time has passed */ - fprintf(stderr, " Drop these bad machines:[ "); - for(i=0; i<nMachines; ++i) { - if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) { - /* fprintf(stderr, "%d ", i); */ - fprintf(stderr, "%s ", id2name(i)); - bad_machines[i]= BAD_MACHINE; - /* - The pages sent by the master will be ignored by - the bad machine, because the current_file nubmer - does not match. - */ - } - } - fprintf(stderr, "]\n"); - break; - } - } - } -} - -int is_it_missing(int page) -{ - return (missing_page_flag[page]==MISSING) ? TRUE : FALSE; -} - -int pr_missing_pages() -{ - int i, N, exit_code=0; - off_t delta; - unsigned int dp; - - for(i=0; i<nMachines; ++i) { - char name[PATH_MAX]; - N = get_total_missing_pages(i); - strcpy(name, id2name(i)); - if (strlen(name)==0) sprintf(name, "machine(%3d)", i); - fprintf(stderr, "%s: #_missing_page_request = %6.2f%% = %d\n", - name, (double)N/((double)total_pages)*100.0, N); - } - - if (skip_count>0) { - fprintf(stderr, "\nWarning: There are %d files which are not delivered.\n", skip_count); - exit_code = -1; - } - - fprintf(stderr, "\nTotal number of files = %12d Pages w/o ack = %12u (%6.2f%%)\n", - total_entries(), pages_wo_ack(), (double)pages_wo_ack()/(double)real_total_pages*100.0); - - dp = real_total_pages - total_pages; - fprintf(stderr, "Total number of pages = %12d Pages re-sent = %12u (%6.2f%%)\n", - total_pages, dp, (double)dp/(double)total_pages*100.0); - - delta = (off_t)(real_total_bytes - total_bytes); - #ifdef _LARGEFILE_SOURCE - fprintf(stderr, "Total number of bytes = %12llu Bytes re-sent = %12llu (%6.2f%%)\n", - total_bytes, delta, (double)delta/(double)total_bytes*100.0); - #else - fprintf(stderr, "Total number of bytes = %12d Bytes re-sent = %12u (%6.2f%%)\n", - total_bytes, delta, (double)delta/(double)total_bytes*100.0); - #endif - - return (exit_code); -} - -/* count the number of bad machines */ -int nBadMachines() -{ - int i, count = 0; - for(i=0; i<nMachines; ++i) { - if (bad_machines[i] == BAD_MACHINE) ++count; - } - - return count; -} - -int choose_print_machines(char *stArray, char selection, char * msg_prefix) -{ - int i, count = 0; - - for(i=0; i<nMachines; ++i) { - char line[PATH_MAX]; - if (stArray[i] == selection) { - ++count; - strcpy(line, id2name(i)); - if (count == 1) { fprintf(stderr, msg_prefix); } - if (strlen(line)==0) - fprintf(stderr, "%d ", i); - else - fprintf(stderr, "%s ", line); - } - } - if (count > 0) fprintf(stderr, "]\n"); - return count; -} - -int send_done_and_pr_msgs(double total_time, double t_page) -{ - int exit_code1 =0; - int exit_code2 =0; - int exit_code3 =0; - - send_all_done_cmd(); - - /* exit_code1 !=0 if there are files that were not delivered due to change or skipped */ - exit_code1 = pr_missing_pages(); - - fprintf(stderr, "Total time spent = %6.2f (min) ~ %6.2f (min/GB)\n\n", - total_time, total_time / ((double)real_total_bytes/1.0e9)); - fprintf(stderr, "Send pages time = %6.2f (min) ~ %6.2f (min/GB)\n\n", - t_page, t_page / ((double)real_total_bytes/1.0e9)); - - exit_code2 = choose_print_machines(bad_machines, - BAD_MACHINE, - "Not synced for bad machines:[ "); - - if (quitWithOneBad && nBadMachines() >=1) { - fprintf(stderr, "We choose to exit when at least one target is bad\n"); - fprintf(stderr, "All files following the current one did not get delivered\n"); - fprintf(stderr, "If resend cmd(CLOSE_FILE), then the current file may have been delivered to non-bad targets\n\n"); - } - - if (current_entry() < total_entries()) { /* if we exit prematurely */ - exit_code3 = choose_print_machines(machine_status, - NOT_READY, "\nNot-ready machines:[ "); - } - - if (verbose>=1) pr_rtt_hist(); - return (exit_code1+exit_code3); /* 200807 removed exit_code2 because bad machines case has been dealt with - by -q. If no -q, then the bad machines are considered 'harmless' */ -} - -/* to do some cleanup before exit IF all machines are bad */ -void do_badMachines_exit() -{ - if ((quitWithOneBad && nBadMachines() < 1) || - (!quitWithOneBad && (nBadMachines() < nMachines))) return; - - if (quitWithOneBad) - fprintf(stderr, "One (or more) machine is bad. Exit!\n"); - else - fprintf(stderr, "All machines are bad. Exit!\n"); - - send_done_and_pr_msgs(-1.0, -1.0); - exit(-1); -} - -void do_cntl_c(int signo) -{ - fprintf(stderr, "Control_C interrupt detected!\n"); - - send_done_and_pr_msgs(-1.0, -1.0); - exit(-1); -} |
