From ded46d4768498c7d27fedcc438fe80a59ad63d0c Mon Sep 17 00:00:00 2001 From: Guillaume Horel Date: Wed, 4 Nov 2015 13:55:26 -0500 Subject: move code into a src directory --- src/multicaster.c | 582 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 582 insertions(+) create mode 100644 src/multicaster.c (limited to 'src/multicaster.c') diff --git a/src/multicaster.c b/src/multicaster.c new file mode 100644 index 0000000..3b26cae --- /dev/null +++ b/src/multicaster.c @@ -0,0 +1,582 @@ +/* + Copyright (C) 2006-2008 Renaissance Technologies Corp. + main developer: HP Wei + version 3.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + -- catching slow machine as the feedback monitor + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64 bit arch by Nicolas Marot in France + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() + version 4.0 major update + -- consolidate sending missing pages in complaint flow + cutting the messages by one order magnitude + -- exit code adjustment + + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#include /* to define PATH_MAX */ +#include +#include +#include + +extern int verbose; +extern char * machine_list_file; /* defined in complaints.c */ +extern char * bad_machines; /* array of size nMachines, defined in complaints.c */ +extern char * file_received; +extern int * missing_pages; +extern int backup; +extern int nPattern; +extern char * pattern_baseDir; +extern int nTargets; +extern int my_ACK_WAIT_TIMES; +extern int toRmFirst; +extern int quitWithOneBad; +extern int skip_count; +extern int file_changed; + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +int my_TTL = MCAST_TTL; +int my_LOOP = MCAST_LOOP; +char * my_IFname = MCAST_IF; + +int monitorID = -1; + +int no_feedback_count; + +void usage() +{ + fprintf(stderr, + "multicaster (to copy files to many multicatchers) version %s)\n" + " Option list:\n" + " [ -v ]\n" + " [ -w \n" + " -s \n" + " -f \n" + " -------- options for backup ---------------------------------\n" + " [ -b flag to turn on backup ]\n" + " [ -r for regex patterns for files needing backup ]\n" + " [ -d for regex patterns ]\n" + " -------- mcast options --------------------------------------\n" + " [ -A **same as for multicatcher ]\n" + " [ -P **same as for multicatcher ]\n" + " [ -T ]\n" + " [ -L flag turn on mcast_LOOP. default is off ]\n" + " [ -I ]\n", + VERSION, my_ACK_WAIT_TIMES, MCAST_ADDR, PORT, my_TTL); +} + + +int monitor_cmd(int cmd, int machineID) +{ + /* + Once this fx is called the old monitor will be + turned off. So, we need to make sure this fx + returns TRUE for a machine. Or the monitor_set_up + should fail. + */ + int count =0, resp; + set_delay(0, FAST); + send_cmd(cmd, machineID); + while (1) { /* wait for ack */ + resp = read_handle_complaint(cmd); + if (resp<0) { /* time-out */ + ++count; + send_cmd(cmd, machineID); + if (count > SET_MON_WAIT_TIMES) { + return FALSE; + } + } else if (resp==1) { /* got ack */ + return TRUE; + } + /* irrevelant resp -- continue */ + } +} + +int find_max_missing_machine(char *flags) +{ + /* flags[] -> which machines are not considered */ + int i, index = -1; + int max = -1; + int threshold; + + for(i=0; i max && flags[i] == GOOD_MACHINE) { + max = missing; + index = i; + } + } + + threshold = (get_nPages() > SWITCH_THRESHOLD*100) ? + get_nPages() / 100 : SWITCH_THRESHOLD; + if (index>=0) { + if (monitorID>=0) { + if (flags[monitorID]==BAD_MACHINE) return index; + /** if (max <= (total_missing_page[monitorID] + threshold)) **/ + if (max <= (missing_pages[monitorID] + threshold)) + return monitorID; + } + return index; + } else { /* could come here if all are busy*/ + return -1; + } + + /** + return ((index >= 0) && + (monitorID >= 0) && + (bad_machines[monitorID] == GOOD_MACHINE) && + (max <= (total_missing_page[monitorID] + threshold))) ? + monitorID : index; + **/ +} + +void set_monitor(int mid) +{ + /* one machine is to be set. So need to succeed. */ + if (monitor_cmd(SELECT_MONITOR_CMD, mid)) { + if (verbose >=1) fprintf(stderr, "Monitor - %s\n", id2name(mid)); + return; + } else { + fprintf(stderr, "Fatal: monitor %s cannot be set up!\n", id2name(mid)); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void check_change_monitor(int undesired_index) +{ + /* this function changes the value of the global var: monitorID */ + int i, count; + char * flags; + + /* if all targets received the file, no need to go on */ + if ((count=nNotRecv())==0) return; + + if (count==1) { + i = iNotRecv(); + if (bad_machines[i] == GOOD_MACHINE) { + monitorID = i; + /* + 'i' could be the current monitor. + We'd like to set it because there might + be something wrong with it if we come to + to this point. + */ + set_monitor(monitorID); + } + return; + } + + /* more than two machines do not receive the file yet */ + /* flags mark those machines as BAD which we don't want to consider */ + flags = malloc(nTargets * sizeof(char)); + for(i=0; i=1) fprintf(stderr, "Monitor = %s\n", id2name(monitorID)); + break; + } else { + flags[monitorID] = BAD_MACHINE; + /* Then, we attemp to set up other machine */ + } + ++count; + } + + free(flags); + if (monitorID < 0) { + fprintf(stderr, "Fatal: monitor machine cannot be set up!\n"); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void do_one_page(int page) +{ + int resp; + unsigned long rtt; + refresh_timer(); + start_timer(); + if (!send_page(page)) return; + + /* first ignore all irrelevant resp */ + resp=read_handle_complaint(SENDING_DATA); + while (resp==0) { + resp=read_handle_complaint(SENDING_DATA); + } + + /* read_handle_complaint() waits n*interpage_interval at most */ + if (resp==-1) { + /* delay_sec for readable() is set by set_delay() */ + /****** + at this point, the readable() returns without getting a reply + from monitorID after FACTOR*DT_PERPAGE (or DT_PERPAGE if without_monitor) + ****/ + ++no_feedback_count; + if (verbose>=2) printf("no reply, count = %d\n", no_feedback_count); + update_rtt_hist(999999); + /* register this page as rtt = infinite --- the last element in rtt_hist */ + + if (no_feedback_count > NO_FEEDBACK_COUNT_MAX) { + /* switch to another client */ + if (verbose >=2) + fprintf(stderr, + "Consecutive non_feedback exceeds limit, Changing monitor machine.\n"); + /* if (nTargets>1 && (nTargets - nBadMachines()) > 1 && nNotRecv() > 1) */ + check_change_monitor(monitorID); /* replace the current monitor */ + no_feedback_count = 0; + } + return; + } else { /* resp == 1 */ + end_timer(); + update_time_accumulator(); + rtt = get_accumulated_usec(); + /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ + /* to do: update histogram */ + if (verbose >=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); + update_rtt_hist(rtt); + + no_feedback_count = 0; + } +} + +void send_cmd_and_wait_ack(int cmd_code) +{ + send_cmd(cmd_code, (int) ALL_MACHINES); + refresh_machine_status(); + /*set_delay(0, FAST);*/ + set_delay(0, DT_PERPAGE*FACTOR); + if (cmd_code==EOF_CMD) mod_machine_status(); + wait_for_ok(cmd_code); + do_badMachines_exit(); + /* check_change_monitor(-1); */ +} + +int do_file_changed_skip() +{ + /* if file is changed during syncing, then we should skip this file */ + if (file_changed || !same_stat_for_file()) { + fprintf(stderr, "WARNING: file is changed during sycing -- skipping\n"); + send_cmd_and_wait_ack(CLOSE_ABORT_CMD); + free_missing_page_flag(); + ++skip_count; + return TRUE; + } + return FALSE; +} + +int main(int argc, char *argv[]) +{ + int c; + int cfile, ctotal_pages, cpage; + char * source_path = NULL; + char * synclist_path = NULL; + char * machine_list_file = NULL; + time_t tloc; + time_t time0, time1, t_page0, t_page; + + while ((c = getopt(argc, argv, "v:w:A:P:T:LI:m:s:f:br:d:Xq")) != EOF) { + switch (c) { + case 'v': + verbose = atoi(optarg); + break; + case 'w': + my_ACK_WAIT_TIMES = atoi(optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'T': + my_TTL = atoi(optarg); + break; + case 'L': + my_LOOP = TRUE; + break; + case 'I': + my_IFname = optarg; + break; + case 'm': + machine_list_file = optarg; + break; + case 's': + source_path = optarg; + break; + case 'f': + synclist_path = optarg; + break; + case 'b': + backup = TRUE; /* if nPattern==0, backup means back up ALL files */ + break; + case 'r': /* to selectively back up certain files as defined in the pattern */ + if (!read_backup_pattern(optarg)) { + fprintf(stderr, "Failed in loading regex patterns in file = %s\n", optarg); + exit(BAD_EXIT); + } + break; + case 'd': + pattern_baseDir = strdup(optarg); + if (pattern_baseDir[strlen(pattern_baseDir)-1]=='/') + pattern_baseDir[strlen(pattern_baseDir)-1] = '\0' ; /* remove last / */ + break; + case 'X': + toRmFirst = TRUE; + break; + case 'q': + quitWithOneBad = TRUE; + break; + case '?': + usage(); + exit(BAD_EXIT); + } + } + + if (!machine_list_file || !source_path || !synclist_path ) { + fprintf(stderr, "Essential options (-m -s -f) should be specified. \n"); + usage(); + exit(BAD_EXIT); + } + + if (nPattern>0) backup = TRUE; + if (backup && nPattern>0) { + if (!pattern_baseDir) pattern_baseDir = strdup(source_path); + if (strlen(source_path) < strlen(pattern_baseDir) || + strncmp(source_path, pattern_baseDir, strlen(pattern_baseDir))!=0) { + fprintf(stderr, + "src_path (%s) should include (and be longer than) pattern_baseDir (%s)", + source_path, pattern_baseDir); + exit(BAD_EXIT); + } + } + + if (backup && toRmFirst) { + fprintf(stderr, "-B and -X cannot co-exist\n"); + exit(BAD_EXIT); + } + + get_machine_names(machine_list_file); + if (nTargets==0) { + fprintf(stderr, "No target to sync to\n"); + exit(GOOD_EXIT); + } + + if (!init_synclist(synclist_path, source_path)) exit(BAD_EXIT); + + if (total_entries()==0) { + fprintf(stderr, "Nothing to sync in %s\n", synclist_path); + exit(GOOD_EXIT); + } + + if (verbose >= 2) + fprintf(stderr, "Total number of files: %d\n", total_entries()); + + /* init the network stuff and some flags */ + init_sends(); + init_complaints(); + init_machine_status(nTargets); + + /* set up Cntl_C catcher */ + Signal(SIGINT, do_cntl_c); + + /* ------------------- set up monitor machine for doing feedback for each page sent */ + check_change_monitor(-1); + + /*-------------------------------------------------------------------------------*/ + + init_rtt_hist(); + time0 = time(&tloc); /* start time */ + t_page = 0; /* total time for sending pages */ + + /* -----------------------------Send the file one by one -----------------------------------*/ + for (cfile = 1; cfile <= total_entries(); cfile++) { /* for each file to be synced */ + if (!get_next_entry(cfile)) continue; + + ctotal_pages = pages_for_file(); + + /* + By the time this file, which was obtained when synclist was + established some time ago, may no longer exist on the master. + So, we need to check the existence of this file. + fexist() also opens the file so that it won't be deleted + between here and the send-page-loop. + */ + if (ctotal_pages > 0 && (!same_stat_for_file() || + !fexist(current_entry()))) { + /* go to next file if this file has changed or does not exist */ + fprintf(stderr, "%s (%d out of %d; Extinct file)\n", + getFilename(), current_entry(), total_entries()); + adjust_totals(); + continue; + } + + if (ctotal_pages < 0) { + fprintf(stderr, "%s (%d out of %d; to delete)\n", + getFilename(), current_entry(), total_entries()); + } else { + fprintf(stderr, "%s (%d out of %d; %d pages)\n", + getFilename(), current_entry(), total_entries(), ctotal_pages); + } + + /* send_open_cmd */ + pack_open_file_info(); + send_cmd_and_wait_ack(OPEN_FILE_CMD); + + /* + ctotal_pages < 0, for deletion + ctotal_pages = 0, regular file with no content. + or directory, softlink, hardlink + both should have been finished with OPEN_FILE_CMD + */ + if (ctotal_pages <= 0) continue; + + /* for other regular files */ + init_missing_page_flag(ctotal_pages); + refresh_missing_pages(); /* total missing pages for this file for each tar */ + + /* ----- sending file data ----- first round */ + t_page0 = time(&tloc); + no_feedback_count = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++) { + /* + the mode field and delay may be changed by change_monitor + */ + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(SENDING_DATA); + do_one_page(cpage); + } + + if (do_file_changed_skip()) continue; + + /* send "I am done with the first round" */ + reset_has_missing(); + refresh_file_received(); /* to record machines that have received this file */ + send_cmd_and_wait_ack(EOF_CMD); + + /* after the first run, before we go to 2nd and 3rd run, */ + if (has_missing_pages()) check_change_monitor(-1); + + /* ----- sending file data again, 2nd and 3rd and ...n-th round */ + reset_has_sick(); + while (has_missing_pages()) { + int c; /****************/ + no_feedback_count = 0; + + c = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++){ + if (is_it_missing(cpage-1)) { + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(RESENDING_DATA); + do_one_page(cpage); + page_sent(cpage-1); + ++c; /*************/ + } + } + if (verbose>=1) + fprintf(stderr, "re-sent N_pages = %d\n", c); /*************/ + + /* eof */ + reset_has_missing(); + send_cmd_and_wait_ack(EOF_CMD); + if (has_sick_machines()) { + break; + /* one machine can reach sick_state while some others are still + in missing_page state. + This break here is ok in terms of skipping this file.*/ + } else { + check_change_monitor(-1); + } + }; + + t_page += (time(&tloc) - t_page0);; + if (do_file_changed_skip()) continue; + + /* close file */ + send_cmd_and_wait_ack((has_sick_machines()) ? CLOSE_ABORT_CMD : CLOSE_FILE_CMD); + if (has_sick_machines()) { + fprintf(stderr, "Skip_syncing %s\n", getFilename()); + } + free_missing_page_flag(); + } /* end of the for each_file loop */ + + time1= time(&tloc); + return send_done_and_pr_msgs( ((double)(time1 - time0))/ 60.0, ((double)t_page)/60.0); +} + -- cgit v1.2.3-70-g09d2