diff options
| author | Guillaume Horel <guillaume.horel@serenitascapital.com> | 2015-11-04 13:55:26 -0500 |
|---|---|---|
| committer | Guillaume Horel <guillaume.horel@serenitascapital.com> | 2015-11-04 13:55:26 -0500 |
| commit | ded46d4768498c7d27fedcc438fe80a59ad63d0c (patch) | |
| tree | 1158247ec3b9580a3deaa320334c2d777050b6b9 /multicaster.c | |
| parent | a5309fed914fdaa7697f2d369e7dcd02309063ab (diff) | |
| download | mrsync-ded46d4768498c7d27fedcc438fe80a59ad63d0c.tar.gz | |
move code into a src directory
Diffstat (limited to 'multicaster.c')
| -rw-r--r-- | multicaster.c | 582 |
1 files changed, 0 insertions, 582 deletions
diff --git a/multicaster.c b/multicaster.c deleted file mode 100644 index 3b26cae..0000000 --- a/multicaster.c +++ /dev/null @@ -1,582 +0,0 @@ -/* - Copyright (C) 2006-2008 Renaissance Technologies Corp. - main developer: HP Wei <hp@rentec.com> - version 3.0 major update - -- large file support - -- platform independence (between linux, unix) - -- backup feature (as in rsync) - -- removing meta-file-info - -- catching slow machine as the feedback monitor - -- mcast options - version 3.0.[1-9] bug fixes - -- logic flaw which under certain condition - caused premature dropout due to - unsuccessful EOF, CLOSE_FILE - and caused unwanranted SIT-OUT cases. - -- tested on Debian 64 bit arch by Nicolas Marot in France - version 3.1.0 - -- codes for IPv6 are ready (but not tested) - IPv4 is tested ok. - version 3.2.0 - -- monitor change improvement - -- handshake improvement (e.g. seq #) - -- if one machine skips a file, all will NOT close() - version 4.0 major update - -- consolidate sending missing pages in complaint flow - cutting the messages by one order magnitude - -- exit code adjustment - - Copyright (C) 2005 Renaissance Technologies Corp. - Copyright (C) 2001 Renaissance Technologies Corp. - main developer: HP Wei <hp@rentec.com> - This file was modified in 2001 from files in the program - multicaster copyrighted by Aaron Hillegass as found at - <http://sourceforge.net/projects/multicaster/> - - Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2, or (at your option) - any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; see the file COPYING. - If not, write to the Free Software Foundation, - 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -*/ - -#include "main.h" -#include <stdlib.h> -#include <limits.h> /* to define PATH_MAX */ -#include <sys/times.h> -#include <setjmp.h> -#include <signal.h> - -extern int verbose; -extern char * machine_list_file; /* defined in complaints.c */ -extern char * bad_machines; /* array of size nMachines, defined in complaints.c */ -extern char * file_received; -extern int * missing_pages; -extern int backup; -extern int nPattern; -extern char * pattern_baseDir; -extern int nTargets; -extern int my_ACK_WAIT_TIMES; -extern int toRmFirst; -extern int quitWithOneBad; -extern int skip_count; -extern int file_changed; - -char * my_MCAST_ADDR = MCAST_ADDR; -int my_FLOW_PORT = FLOW_PORT; -int my_PORT = PORT; -int my_TTL = MCAST_TTL; -int my_LOOP = MCAST_LOOP; -char * my_IFname = MCAST_IF; - -int monitorID = -1; - -int no_feedback_count; - -void usage() -{ - fprintf(stderr, - "multicaster (to copy files to many multicatchers) version %s)\n" - " Option list:\n" - " [ -v <verbose_level 0-2> ]\n" - " [ -w <ack_wait_times default= %d (secs) ]\n" - " [ -X toRmFirst_flag default= cp2tmp_and_mv ]\n" - " [ -q quit_with_1_bad defalut= quit_with_all_bad ]\n" - " -------- essential options ----------------------------------\n" - " -m <machine_list_filePath>\n" - " -s <data_source_path>\n" - " -f <synclist_filePath>\n" - " -------- options for backup ---------------------------------\n" - " [ -b flag to turn on backup ]\n" - " [ -r <filepath> for regex patterns for files needing backup ]\n" - " [ -d <basedir> for regex patterns ]\n" - " -------- mcast options --------------------------------------\n" - " [ -A <my_mcast_address default=%s> **same as for multicatcher ]\n" - " [ -P <my_PORT default=%d> **same as for multicatcher ]\n" - " [ -T <my_TTL default=%d> ]\n" - " [ -L flag turn on mcast_LOOP. default is off ]\n" - " [ -I <my_MCAST_IF default=NULL> ]\n", - VERSION, my_ACK_WAIT_TIMES, MCAST_ADDR, PORT, my_TTL); -} - - -int monitor_cmd(int cmd, int machineID) -{ - /* - Once this fx is called the old monitor will be - turned off. So, we need to make sure this fx - returns TRUE for a machine. Or the monitor_set_up - should fail. - */ - int count =0, resp; - set_delay(0, FAST); - send_cmd(cmd, machineID); - while (1) { /* wait for ack */ - resp = read_handle_complaint(cmd); - if (resp<0) { /* time-out */ - ++count; - send_cmd(cmd, machineID); - if (count > SET_MON_WAIT_TIMES) { - return FALSE; - } - } else if (resp==1) { /* got ack */ - return TRUE; - } - /* irrevelant resp -- continue */ - } -} - -int find_max_missing_machine(char *flags) -{ - /* flags[] -> which machines are not considered */ - int i, index = -1; - int max = -1; - int threshold; - - for(i=0; i<nTargets; ++i) { - int missing; - /** missing = total_missing_page[i]; **/ - missing = missing_pages[i]; /* for last file (OPEN) or this file (other cmds)*/ - if (missing > max && flags[i] == GOOD_MACHINE) { - max = missing; - index = i; - } - } - - threshold = (get_nPages() > SWITCH_THRESHOLD*100) ? - get_nPages() / 100 : SWITCH_THRESHOLD; - if (index>=0) { - if (monitorID>=0) { - if (flags[monitorID]==BAD_MACHINE) return index; - /** if (max <= (total_missing_page[monitorID] + threshold)) **/ - if (max <= (missing_pages[monitorID] + threshold)) - return monitorID; - } - return index; - } else { /* could come here if all are busy*/ - return -1; - } - - /** - return ((index >= 0) && - (monitorID >= 0) && - (bad_machines[monitorID] == GOOD_MACHINE) && - (max <= (total_missing_page[monitorID] + threshold))) ? - monitorID : index; - **/ -} - -void set_monitor(int mid) -{ - /* one machine is to be set. So need to succeed. */ - if (monitor_cmd(SELECT_MONITOR_CMD, mid)) { - if (verbose >=1) fprintf(stderr, "Monitor - %s\n", id2name(mid)); - return; - } else { - fprintf(stderr, "Fatal: monitor %s cannot be set up!\n", id2name(mid)); - send_done_and_pr_msgs(-1.0, -1.0); - exit(BAD_EXIT); - } -} - -void check_change_monitor(int undesired_index) -{ - /* this function changes the value of the global var: monitorID */ - int i, count; - char * flags; - - /* if all targets received the file, no need to go on */ - if ((count=nNotRecv())==0) return; - - if (count==1) { - i = iNotRecv(); - if (bad_machines[i] == GOOD_MACHINE) { - monitorID = i; - /* - 'i' could be the current monitor. - We'd like to set it because there might - be something wrong with it if we come to - to this point. - */ - set_monitor(monitorID); - } - return; - } - - /* more than two machines do not receive the file yet */ - /* flags mark those machines as BAD which we don't want to consider */ - flags = malloc(nTargets * sizeof(char)); - for(i=0; i<nTargets; ++i) { - flags[i] = (i == undesired_index || file_received[i] == FILE_RECV) ? - BAD_MACHINE : bad_machines[i]; - } - - /* check if all machines are not to be considered */ - count = 0; - for(i=0; i<nTargets; ++i) { - if (flags[i]==BAD_MACHINE) ++count; - } - if (count==nTargets) { - /* Two ways to get here are - (1) during do_one_page: - prev_monitor = (not_recv) and other not_recv's are bad_machines. - (2) during after-ack: - prev_monitor = (recv) and other not_recv's are bad_machines. - */ - free(flags); - set_monitor(monitorID); - return; - } - - /* at least one not_recv (and good) machine is to be considered */ - count = 0; - while (count < nTargets) { - i = find_max_missing_machine(flags); - /* if (i==monitorID) break;*/ - monitorID = i; - - if (monitorID < 0) break; - - if (monitor_cmd(SELECT_MONITOR_CMD, monitorID)) { - if (verbose >=1) fprintf(stderr, "Monitor = %s\n", id2name(monitorID)); - break; - } else { - flags[monitorID] = BAD_MACHINE; - /* Then, we attemp to set up other machine */ - } - ++count; - } - - free(flags); - if (monitorID < 0) { - fprintf(stderr, "Fatal: monitor machine cannot be set up!\n"); - send_done_and_pr_msgs(-1.0, -1.0); - exit(BAD_EXIT); - } -} - -void do_one_page(int page) -{ - int resp; - unsigned long rtt; - refresh_timer(); - start_timer(); - if (!send_page(page)) return; - - /* first ignore all irrelevant resp */ - resp=read_handle_complaint(SENDING_DATA); - while (resp==0) { - resp=read_handle_complaint(SENDING_DATA); - } - - /* read_handle_complaint() waits n*interpage_interval at most */ - if (resp==-1) { - /* delay_sec for readable() is set by set_delay() */ - /****** - at this point, the readable() returns without getting a reply - from monitorID after FACTOR*DT_PERPAGE (or DT_PERPAGE if without_monitor) - ****/ - ++no_feedback_count; - if (verbose>=2) printf("no reply, count = %d\n", no_feedback_count); - update_rtt_hist(999999); - /* register this page as rtt = infinite --- the last element in rtt_hist */ - - if (no_feedback_count > NO_FEEDBACK_COUNT_MAX) { - /* switch to another client */ - if (verbose >=2) - fprintf(stderr, - "Consecutive non_feedback exceeds limit, Changing monitor machine.\n"); - /* if (nTargets>1 && (nTargets - nBadMachines()) > 1 && nNotRecv() > 1) */ - check_change_monitor(monitorID); /* replace the current monitor */ - no_feedback_count = 0; - } - return; - } else { /* resp == 1 */ - end_timer(); - update_time_accumulator(); - rtt = get_accumulated_usec(); - /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ - /* to do: update histogram */ - if (verbose >=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); - update_rtt_hist(rtt); - - no_feedback_count = 0; - } -} - -void send_cmd_and_wait_ack(int cmd_code) -{ - send_cmd(cmd_code, (int) ALL_MACHINES); - refresh_machine_status(); - /*set_delay(0, FAST);*/ - set_delay(0, DT_PERPAGE*FACTOR); - if (cmd_code==EOF_CMD) mod_machine_status(); - wait_for_ok(cmd_code); - do_badMachines_exit(); - /* check_change_monitor(-1); */ -} - -int do_file_changed_skip() -{ - /* if file is changed during syncing, then we should skip this file */ - if (file_changed || !same_stat_for_file()) { - fprintf(stderr, "WARNING: file is changed during sycing -- skipping\n"); - send_cmd_and_wait_ack(CLOSE_ABORT_CMD); - free_missing_page_flag(); - ++skip_count; - return TRUE; - } - return FALSE; -} - -int main(int argc, char *argv[]) -{ - int c; - int cfile, ctotal_pages, cpage; - char * source_path = NULL; - char * synclist_path = NULL; - char * machine_list_file = NULL; - time_t tloc; - time_t time0, time1, t_page0, t_page; - - while ((c = getopt(argc, argv, "v:w:A:P:T:LI:m:s:f:br:d:Xq")) != EOF) { - switch (c) { - case 'v': - verbose = atoi(optarg); - break; - case 'w': - my_ACK_WAIT_TIMES = atoi(optarg); - break; - case 'A': - my_MCAST_ADDR = optarg; - break; - case 'P': - my_PORT = atoi(optarg); - my_FLOW_PORT = my_PORT -1; - break; - case 'T': - my_TTL = atoi(optarg); - break; - case 'L': - my_LOOP = TRUE; - break; - case 'I': - my_IFname = optarg; - break; - case 'm': - machine_list_file = optarg; - break; - case 's': - source_path = optarg; - break; - case 'f': - synclist_path = optarg; - break; - case 'b': - backup = TRUE; /* if nPattern==0, backup means back up ALL files */ - break; - case 'r': /* to selectively back up certain files as defined in the pattern */ - if (!read_backup_pattern(optarg)) { - fprintf(stderr, "Failed in loading regex patterns in file = %s\n", optarg); - exit(BAD_EXIT); - } - break; - case 'd': - pattern_baseDir = strdup(optarg); - if (pattern_baseDir[strlen(pattern_baseDir)-1]=='/') - pattern_baseDir[strlen(pattern_baseDir)-1] = '\0' ; /* remove last / */ - break; - case 'X': - toRmFirst = TRUE; - break; - case 'q': - quitWithOneBad = TRUE; - break; - case '?': - usage(); - exit(BAD_EXIT); - } - } - - if (!machine_list_file || !source_path || !synclist_path ) { - fprintf(stderr, "Essential options (-m -s -f) should be specified. \n"); - usage(); - exit(BAD_EXIT); - } - - if (nPattern>0) backup = TRUE; - if (backup && nPattern>0) { - if (!pattern_baseDir) pattern_baseDir = strdup(source_path); - if (strlen(source_path) < strlen(pattern_baseDir) || - strncmp(source_path, pattern_baseDir, strlen(pattern_baseDir))!=0) { - fprintf(stderr, - "src_path (%s) should include (and be longer than) pattern_baseDir (%s)", - source_path, pattern_baseDir); - exit(BAD_EXIT); - } - } - - if (backup && toRmFirst) { - fprintf(stderr, "-B and -X cannot co-exist\n"); - exit(BAD_EXIT); - } - - get_machine_names(machine_list_file); - if (nTargets==0) { - fprintf(stderr, "No target to sync to\n"); - exit(GOOD_EXIT); - } - - if (!init_synclist(synclist_path, source_path)) exit(BAD_EXIT); - - if (total_entries()==0) { - fprintf(stderr, "Nothing to sync in %s\n", synclist_path); - exit(GOOD_EXIT); - } - - if (verbose >= 2) - fprintf(stderr, "Total number of files: %d\n", total_entries()); - - /* init the network stuff and some flags */ - init_sends(); - init_complaints(); - init_machine_status(nTargets); - - /* set up Cntl_C catcher */ - Signal(SIGINT, do_cntl_c); - - /* ------------------- set up monitor machine for doing feedback for each page sent */ - check_change_monitor(-1); - - /*-------------------------------------------------------------------------------*/ - - init_rtt_hist(); - time0 = time(&tloc); /* start time */ - t_page = 0; /* total time for sending pages */ - - /* -----------------------------Send the file one by one -----------------------------------*/ - for (cfile = 1; cfile <= total_entries(); cfile++) { /* for each file to be synced */ - if (!get_next_entry(cfile)) continue; - - ctotal_pages = pages_for_file(); - - /* - By the time this file, which was obtained when synclist was - established some time ago, may no longer exist on the master. - So, we need to check the existence of this file. - fexist() also opens the file so that it won't be deleted - between here and the send-page-loop. - */ - if (ctotal_pages > 0 && (!same_stat_for_file() || - !fexist(current_entry()))) { - /* go to next file if this file has changed or does not exist */ - fprintf(stderr, "%s (%d out of %d; Extinct file)\n", - getFilename(), current_entry(), total_entries()); - adjust_totals(); - continue; - } - - if (ctotal_pages < 0) { - fprintf(stderr, "%s (%d out of %d; to delete)\n", - getFilename(), current_entry(), total_entries()); - } else { - fprintf(stderr, "%s (%d out of %d; %d pages)\n", - getFilename(), current_entry(), total_entries(), ctotal_pages); - } - - /* send_open_cmd */ - pack_open_file_info(); - send_cmd_and_wait_ack(OPEN_FILE_CMD); - - /* - ctotal_pages < 0, for deletion - ctotal_pages = 0, regular file with no content. - or directory, softlink, hardlink - both should have been finished with OPEN_FILE_CMD - */ - if (ctotal_pages <= 0) continue; - - /* for other regular files */ - init_missing_page_flag(ctotal_pages); - refresh_missing_pages(); /* total missing pages for this file for each tar */ - - /* ----- sending file data ----- first round */ - t_page0 = time(&tloc); - no_feedback_count = 0; - for (cpage = 1; cpage <= ctotal_pages; cpage++) { - /* - the mode field and delay may be changed by change_monitor - */ - set_delay(0, DT_PERPAGE*FACTOR); - set_mode(SENDING_DATA); - do_one_page(cpage); - } - - if (do_file_changed_skip()) continue; - - /* send "I am done with the first round" */ - reset_has_missing(); - refresh_file_received(); /* to record machines that have received this file */ - send_cmd_and_wait_ack(EOF_CMD); - - /* after the first run, before we go to 2nd and 3rd run, */ - if (has_missing_pages()) check_change_monitor(-1); - - /* ----- sending file data again, 2nd and 3rd and ...n-th round */ - reset_has_sick(); - while (has_missing_pages()) { - int c; /****************/ - no_feedback_count = 0; - - c = 0; - for (cpage = 1; cpage <= ctotal_pages; cpage++){ - if (is_it_missing(cpage-1)) { - set_delay(0, DT_PERPAGE*FACTOR); - set_mode(RESENDING_DATA); - do_one_page(cpage); - page_sent(cpage-1); - ++c; /*************/ - } - } - if (verbose>=1) - fprintf(stderr, "re-sent N_pages = %d\n", c); /*************/ - - /* eof */ - reset_has_missing(); - send_cmd_and_wait_ack(EOF_CMD); - if (has_sick_machines()) { - break; - /* one machine can reach sick_state while some others are still - in missing_page state. - This break here is ok in terms of skipping this file.*/ - } else { - check_change_monitor(-1); - } - }; - - t_page += (time(&tloc) - t_page0);; - if (do_file_changed_skip()) continue; - - /* close file */ - send_cmd_and_wait_ack((has_sick_machines()) ? CLOSE_ABORT_CMD : CLOSE_FILE_CMD); - if (has_sick_machines()) { - fprintf(stderr, "Skip_syncing %s\n", getFilename()); - } - free_missing_page_flag(); - } /* end of the for each_file loop */ - - time1= time(&tloc); - return send_done_and_pr_msgs( ((double)(time1 - time0))/ 60.0, ((double)t_page)/60.0); -} - |
