diff options
| author | Guillaume Horel <guillaume.horel@serenitascapital.com> | 2015-11-04 12:30:44 -0500 |
|---|---|---|
| committer | Guillaume Horel <guillaume.horel@serenitascapital.com> | 2015-11-04 12:30:44 -0500 |
| commit | a5309fed914fdaa7697f2d369e7dcd02309063ab (patch) | |
| tree | 975bb588c4d9072ae1158ab670bf9fa851abd6f4 /multicaster.c | |
| download | mrsync-a5309fed914fdaa7697f2d369e7dcd02309063ab.tar.gz | |
initial import
Diffstat (limited to 'multicaster.c')
| -rw-r--r-- | multicaster.c | 582 |
1 files changed, 582 insertions, 0 deletions
diff --git a/multicaster.c b/multicaster.c new file mode 100644 index 0000000..3b26cae --- /dev/null +++ b/multicaster.c @@ -0,0 +1,582 @@ +/* + Copyright (C) 2006-2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + version 3.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + -- catching slow machine as the feedback monitor + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64 bit arch by Nicolas Marot in France + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() + version 4.0 major update + -- consolidate sending missing pages in complaint flow + cutting the messages by one order magnitude + -- exit code adjustment + + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include <stdlib.h> +#include <limits.h> /* to define PATH_MAX */ +#include <sys/times.h> +#include <setjmp.h> +#include <signal.h> + +extern int verbose; +extern char * machine_list_file; /* defined in complaints.c */ +extern char * bad_machines; /* array of size nMachines, defined in complaints.c */ +extern char * file_received; +extern int * missing_pages; +extern int backup; +extern int nPattern; +extern char * pattern_baseDir; +extern int nTargets; +extern int my_ACK_WAIT_TIMES; +extern int toRmFirst; +extern int quitWithOneBad; +extern int skip_count; +extern int file_changed; + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +int my_TTL = MCAST_TTL; +int my_LOOP = MCAST_LOOP; +char * my_IFname = MCAST_IF; + +int monitorID = -1; + +int no_feedback_count; + +void usage() +{ + fprintf(stderr, + "multicaster (to copy files to many multicatchers) version %s)\n" + " Option list:\n" + " [ -v <verbose_level 0-2> ]\n" + " [ -w <ack_wait_times default= %d (secs) ]\n" + " [ -X toRmFirst_flag default= cp2tmp_and_mv ]\n" + " [ -q quit_with_1_bad defalut= quit_with_all_bad ]\n" + " -------- essential options ----------------------------------\n" + " -m <machine_list_filePath>\n" + " -s <data_source_path>\n" + " -f <synclist_filePath>\n" + " -------- options for backup ---------------------------------\n" + " [ -b flag to turn on backup ]\n" + " [ -r <filepath> for regex patterns for files needing backup ]\n" + " [ -d <basedir> for regex patterns ]\n" + " -------- mcast options --------------------------------------\n" + " [ -A <my_mcast_address default=%s> **same as for multicatcher ]\n" + " [ -P <my_PORT default=%d> **same as for multicatcher ]\n" + " [ -T <my_TTL default=%d> ]\n" + " [ -L flag turn on mcast_LOOP. default is off ]\n" + " [ -I <my_MCAST_IF default=NULL> ]\n", + VERSION, my_ACK_WAIT_TIMES, MCAST_ADDR, PORT, my_TTL); +} + + +int monitor_cmd(int cmd, int machineID) +{ + /* + Once this fx is called the old monitor will be + turned off. So, we need to make sure this fx + returns TRUE for a machine. Or the monitor_set_up + should fail. + */ + int count =0, resp; + set_delay(0, FAST); + send_cmd(cmd, machineID); + while (1) { /* wait for ack */ + resp = read_handle_complaint(cmd); + if (resp<0) { /* time-out */ + ++count; + send_cmd(cmd, machineID); + if (count > SET_MON_WAIT_TIMES) { + return FALSE; + } + } else if (resp==1) { /* got ack */ + return TRUE; + } + /* irrevelant resp -- continue */ + } +} + +int find_max_missing_machine(char *flags) +{ + /* flags[] -> which machines are not considered */ + int i, index = -1; + int max = -1; + int threshold; + + for(i=0; i<nTargets; ++i) { + int missing; + /** missing = total_missing_page[i]; **/ + missing = missing_pages[i]; /* for last file (OPEN) or this file (other cmds)*/ + if (missing > max && flags[i] == GOOD_MACHINE) { + max = missing; + index = i; + } + } + + threshold = (get_nPages() > SWITCH_THRESHOLD*100) ? + get_nPages() / 100 : SWITCH_THRESHOLD; + if (index>=0) { + if (monitorID>=0) { + if (flags[monitorID]==BAD_MACHINE) return index; + /** if (max <= (total_missing_page[monitorID] + threshold)) **/ + if (max <= (missing_pages[monitorID] + threshold)) + return monitorID; + } + return index; + } else { /* could come here if all are busy*/ + return -1; + } + + /** + return ((index >= 0) && + (monitorID >= 0) && + (bad_machines[monitorID] == GOOD_MACHINE) && + (max <= (total_missing_page[monitorID] + threshold))) ? + monitorID : index; + **/ +} + +void set_monitor(int mid) +{ + /* one machine is to be set. So need to succeed. */ + if (monitor_cmd(SELECT_MONITOR_CMD, mid)) { + if (verbose >=1) fprintf(stderr, "Monitor - %s\n", id2name(mid)); + return; + } else { + fprintf(stderr, "Fatal: monitor %s cannot be set up!\n", id2name(mid)); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void check_change_monitor(int undesired_index) +{ + /* this function changes the value of the global var: monitorID */ + int i, count; + char * flags; + + /* if all targets received the file, no need to go on */ + if ((count=nNotRecv())==0) return; + + if (count==1) { + i = iNotRecv(); + if (bad_machines[i] == GOOD_MACHINE) { + monitorID = i; + /* + 'i' could be the current monitor. + We'd like to set it because there might + be something wrong with it if we come to + to this point. + */ + set_monitor(monitorID); + } + return; + } + + /* more than two machines do not receive the file yet */ + /* flags mark those machines as BAD which we don't want to consider */ + flags = malloc(nTargets * sizeof(char)); + for(i=0; i<nTargets; ++i) { + flags[i] = (i == undesired_index || file_received[i] == FILE_RECV) ? + BAD_MACHINE : bad_machines[i]; + } + + /* check if all machines are not to be considered */ + count = 0; + for(i=0; i<nTargets; ++i) { + if (flags[i]==BAD_MACHINE) ++count; + } + if (count==nTargets) { + /* Two ways to get here are + (1) during do_one_page: + prev_monitor = (not_recv) and other not_recv's are bad_machines. + (2) during after-ack: + prev_monitor = (recv) and other not_recv's are bad_machines. + */ + free(flags); + set_monitor(monitorID); + return; + } + + /* at least one not_recv (and good) machine is to be considered */ + count = 0; + while (count < nTargets) { + i = find_max_missing_machine(flags); + /* if (i==monitorID) break;*/ + monitorID = i; + + if (monitorID < 0) break; + + if (monitor_cmd(SELECT_MONITOR_CMD, monitorID)) { + if (verbose >=1) fprintf(stderr, "Monitor = %s\n", id2name(monitorID)); + break; + } else { + flags[monitorID] = BAD_MACHINE; + /* Then, we attemp to set up other machine */ + } + ++count; + } + + free(flags); + if (monitorID < 0) { + fprintf(stderr, "Fatal: monitor machine cannot be set up!\n"); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void do_one_page(int page) +{ + int resp; + unsigned long rtt; + refresh_timer(); + start_timer(); + if (!send_page(page)) return; + + /* first ignore all irrelevant resp */ + resp=read_handle_complaint(SENDING_DATA); + while (resp==0) { + resp=read_handle_complaint(SENDING_DATA); + } + + /* read_handle_complaint() waits n*interpage_interval at most */ + if (resp==-1) { + /* delay_sec for readable() is set by set_delay() */ + /****** + at this point, the readable() returns without getting a reply + from monitorID after FACTOR*DT_PERPAGE (or DT_PERPAGE if without_monitor) + ****/ + ++no_feedback_count; + if (verbose>=2) printf("no reply, count = %d\n", no_feedback_count); + update_rtt_hist(999999); + /* register this page as rtt = infinite --- the last element in rtt_hist */ + + if (no_feedback_count > NO_FEEDBACK_COUNT_MAX) { + /* switch to another client */ + if (verbose >=2) + fprintf(stderr, + "Consecutive non_feedback exceeds limit, Changing monitor machine.\n"); + /* if (nTargets>1 && (nTargets - nBadMachines()) > 1 && nNotRecv() > 1) */ + check_change_monitor(monitorID); /* replace the current monitor */ + no_feedback_count = 0; + } + return; + } else { /* resp == 1 */ + end_timer(); + update_time_accumulator(); + rtt = get_accumulated_usec(); + /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ + /* to do: update histogram */ + if (verbose >=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); + update_rtt_hist(rtt); + + no_feedback_count = 0; + } +} + +void send_cmd_and_wait_ack(int cmd_code) +{ + send_cmd(cmd_code, (int) ALL_MACHINES); + refresh_machine_status(); + /*set_delay(0, FAST);*/ + set_delay(0, DT_PERPAGE*FACTOR); + if (cmd_code==EOF_CMD) mod_machine_status(); + wait_for_ok(cmd_code); + do_badMachines_exit(); + /* check_change_monitor(-1); */ +} + +int do_file_changed_skip() +{ + /* if file is changed during syncing, then we should skip this file */ + if (file_changed || !same_stat_for_file()) { + fprintf(stderr, "WARNING: file is changed during sycing -- skipping\n"); + send_cmd_and_wait_ack(CLOSE_ABORT_CMD); + free_missing_page_flag(); + ++skip_count; + return TRUE; + } + return FALSE; +} + +int main(int argc, char *argv[]) +{ + int c; + int cfile, ctotal_pages, cpage; + char * source_path = NULL; + char * synclist_path = NULL; + char * machine_list_file = NULL; + time_t tloc; + time_t time0, time1, t_page0, t_page; + + while ((c = getopt(argc, argv, "v:w:A:P:T:LI:m:s:f:br:d:Xq")) != EOF) { + switch (c) { + case 'v': + verbose = atoi(optarg); + break; + case 'w': + my_ACK_WAIT_TIMES = atoi(optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'T': + my_TTL = atoi(optarg); + break; + case 'L': + my_LOOP = TRUE; + break; + case 'I': + my_IFname = optarg; + break; + case 'm': + machine_list_file = optarg; + break; + case 's': + source_path = optarg; + break; + case 'f': + synclist_path = optarg; + break; + case 'b': + backup = TRUE; /* if nPattern==0, backup means back up ALL files */ + break; + case 'r': /* to selectively back up certain files as defined in the pattern */ + if (!read_backup_pattern(optarg)) { + fprintf(stderr, "Failed in loading regex patterns in file = %s\n", optarg); + exit(BAD_EXIT); + } + break; + case 'd': + pattern_baseDir = strdup(optarg); + if (pattern_baseDir[strlen(pattern_baseDir)-1]=='/') + pattern_baseDir[strlen(pattern_baseDir)-1] = '\0' ; /* remove last / */ + break; + case 'X': + toRmFirst = TRUE; + break; + case 'q': + quitWithOneBad = TRUE; + break; + case '?': + usage(); + exit(BAD_EXIT); + } + } + + if (!machine_list_file || !source_path || !synclist_path ) { + fprintf(stderr, "Essential options (-m -s -f) should be specified. \n"); + usage(); + exit(BAD_EXIT); + } + + if (nPattern>0) backup = TRUE; + if (backup && nPattern>0) { + if (!pattern_baseDir) pattern_baseDir = strdup(source_path); + if (strlen(source_path) < strlen(pattern_baseDir) || + strncmp(source_path, pattern_baseDir, strlen(pattern_baseDir))!=0) { + fprintf(stderr, + "src_path (%s) should include (and be longer than) pattern_baseDir (%s)", + source_path, pattern_baseDir); + exit(BAD_EXIT); + } + } + + if (backup && toRmFirst) { + fprintf(stderr, "-B and -X cannot co-exist\n"); + exit(BAD_EXIT); + } + + get_machine_names(machine_list_file); + if (nTargets==0) { + fprintf(stderr, "No target to sync to\n"); + exit(GOOD_EXIT); + } + + if (!init_synclist(synclist_path, source_path)) exit(BAD_EXIT); + + if (total_entries()==0) { + fprintf(stderr, "Nothing to sync in %s\n", synclist_path); + exit(GOOD_EXIT); + } + + if (verbose >= 2) + fprintf(stderr, "Total number of files: %d\n", total_entries()); + + /* init the network stuff and some flags */ + init_sends(); + init_complaints(); + init_machine_status(nTargets); + + /* set up Cntl_C catcher */ + Signal(SIGINT, do_cntl_c); + + /* ------------------- set up monitor machine for doing feedback for each page sent */ + check_change_monitor(-1); + + /*-------------------------------------------------------------------------------*/ + + init_rtt_hist(); + time0 = time(&tloc); /* start time */ + t_page = 0; /* total time for sending pages */ + + /* -----------------------------Send the file one by one -----------------------------------*/ + for (cfile = 1; cfile <= total_entries(); cfile++) { /* for each file to be synced */ + if (!get_next_entry(cfile)) continue; + + ctotal_pages = pages_for_file(); + + /* + By the time this file, which was obtained when synclist was + established some time ago, may no longer exist on the master. + So, we need to check the existence of this file. + fexist() also opens the file so that it won't be deleted + between here and the send-page-loop. + */ + if (ctotal_pages > 0 && (!same_stat_for_file() || + !fexist(current_entry()))) { + /* go to next file if this file has changed or does not exist */ + fprintf(stderr, "%s (%d out of %d; Extinct file)\n", + getFilename(), current_entry(), total_entries()); + adjust_totals(); + continue; + } + + if (ctotal_pages < 0) { + fprintf(stderr, "%s (%d out of %d; to delete)\n", + getFilename(), current_entry(), total_entries()); + } else { + fprintf(stderr, "%s (%d out of %d; %d pages)\n", + getFilename(), current_entry(), total_entries(), ctotal_pages); + } + + /* send_open_cmd */ + pack_open_file_info(); + send_cmd_and_wait_ack(OPEN_FILE_CMD); + + /* + ctotal_pages < 0, for deletion + ctotal_pages = 0, regular file with no content. + or directory, softlink, hardlink + both should have been finished with OPEN_FILE_CMD + */ + if (ctotal_pages <= 0) continue; + + /* for other regular files */ + init_missing_page_flag(ctotal_pages); + refresh_missing_pages(); /* total missing pages for this file for each tar */ + + /* ----- sending file data ----- first round */ + t_page0 = time(&tloc); + no_feedback_count = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++) { + /* + the mode field and delay may be changed by change_monitor + */ + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(SENDING_DATA); + do_one_page(cpage); + } + + if (do_file_changed_skip()) continue; + + /* send "I am done with the first round" */ + reset_has_missing(); + refresh_file_received(); /* to record machines that have received this file */ + send_cmd_and_wait_ack(EOF_CMD); + + /* after the first run, before we go to 2nd and 3rd run, */ + if (has_missing_pages()) check_change_monitor(-1); + + /* ----- sending file data again, 2nd and 3rd and ...n-th round */ + reset_has_sick(); + while (has_missing_pages()) { + int c; /****************/ + no_feedback_count = 0; + + c = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++){ + if (is_it_missing(cpage-1)) { + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(RESENDING_DATA); + do_one_page(cpage); + page_sent(cpage-1); + ++c; /*************/ + } + } + if (verbose>=1) + fprintf(stderr, "re-sent N_pages = %d\n", c); /*************/ + + /* eof */ + reset_has_missing(); + send_cmd_and_wait_ack(EOF_CMD); + if (has_sick_machines()) { + break; + /* one machine can reach sick_state while some others are still + in missing_page state. + This break here is ok in terms of skipping this file.*/ + } else { + check_change_monitor(-1); + } + }; + + t_page += (time(&tloc) - t_page0);; + if (do_file_changed_skip()) continue; + + /* close file */ + send_cmd_and_wait_ack((has_sick_machines()) ? CLOSE_ABORT_CMD : CLOSE_FILE_CMD); + if (has_sick_machines()) { + fprintf(stderr, "Skip_syncing %s\n", getFilename()); + } + free_missing_page_flag(); + } /* end of the for each_file loop */ + + time1= time(&tloc); + return send_done_and_pr_msgs( ((double)(time1 - time0))/ 60.0, ((double)t_page)/60.0); +} + |
