From ded46d4768498c7d27fedcc438fe80a59ad63d0c Mon Sep 17 00:00:00 2001 From: Guillaume Horel Date: Wed, 4 Nov 2015 13:55:26 -0500 Subject: move code into a src directory --- src/backup.c | 108 +++++++ src/complaint_sender.c | 158 +++++++++ src/complaints.c | 579 +++++++++++++++++++++++++++++++++ src/file_operations.c | 800 ++++++++++++++++++++++++++++++++++++++++++++++ src/global.c | 35 ++ src/id_map.c | 74 +++++ src/main.h | 189 +++++++++++ src/main.h.in | 189 +++++++++++ src/multicaster.c | 582 +++++++++++++++++++++++++++++++++ src/multicatcher.c | 181 +++++++++++ src/page_reader.c | 426 ++++++++++++++++++++++++ src/parse_synclist.c | 320 +++++++++++++++++++ src/proto.h | 182 +++++++++++ src/rtt.c | 258 +++++++++++++++ src/rttcatcher.c | 118 +++++++ src/rttcomplaint_sender.c | 103 ++++++ src/rttcomplaints.c | 270 ++++++++++++++++ src/rttmain.h | 126 ++++++++ src/rttmissings.c | 93 ++++++ src/rttpage_reader.c | 188 +++++++++++ src/rttproto.h | 116 +++++++ src/rttsends.c | 144 +++++++++ src/sends.c | 329 +++++++++++++++++++ src/set_catcher_mcast.c | 142 ++++++++ src/set_mcast.c | 160 ++++++++++ src/setup_socket.c | 242 ++++++++++++++ src/signal.c | 93 ++++++ src/signal.h | 32 ++ src/timing.c | 109 +++++++ src/trFilelist.c | 449 ++++++++++++++++++++++++++ 30 files changed, 6795 insertions(+) create mode 100644 src/backup.c create mode 100644 src/complaint_sender.c create mode 100644 src/complaints.c create mode 100644 src/file_operations.c create mode 100644 src/global.c create mode 100644 src/id_map.c create mode 100644 src/main.h create mode 100644 src/main.h.in create mode 100644 src/multicaster.c create mode 100644 src/multicatcher.c create mode 100644 src/page_reader.c create mode 100644 src/parse_synclist.c create mode 100644 src/proto.h create mode 100644 src/rtt.c create mode 100644 src/rttcatcher.c create mode 100644 src/rttcomplaint_sender.c create mode 100644 src/rttcomplaints.c create mode 100644 src/rttmain.h create mode 100644 src/rttmissings.c create mode 100644 src/rttpage_reader.c create mode 100644 src/rttproto.h create mode 100644 src/rttsends.c create mode 100644 src/sends.c create mode 100644 src/set_catcher_mcast.c create mode 100644 src/set_mcast.c create mode 100644 src/setup_socket.c create mode 100644 src/signal.c create mode 100644 src/signal.h create mode 100644 src/timing.c create mode 100644 src/trFilelist.c (limited to 'src') diff --git a/src/backup.c b/src/backup.c new file mode 100644 index 0000000..3c59a9d --- /dev/null +++ b/src/backup.c @@ -0,0 +1,108 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include /* use POSIX in order to be portable to linux */ + +extern int verbose; +int backup = FALSE; +char *pattern_baseDir = NULL; + +/* --------- for syncing all files with/without backup. */ +int nPattern=0; /* number of regular expression for backup files */ +regex_t ** fPatterns;/* array of pointers to regular expression */ + +int set_nPattern(char * fpat_file) +{ + FILE *fd; + char pat[PATH_MAX]; + int count = 0; + if ((fd = fopen(fpat_file, "r"))==NULL) { + fprintf(stderr, "Cannot open file -- %s\n", fpat_file); + return FAIL; + } + while (!feof(fd)) { + if (fscanf(fd, "%s", pat) == 1) { + ++count; + } + } + nPattern = count; + fclose(fd); + return SUCCESS; +} + +int read_backup_pattern(char * fpat_file) +{ + FILE *fd; + char pat[PATH_MAX]; + int i = 0; + + if (!set_nPattern(fpat_file)) return FAIL; + + fPatterns = malloc(sizeof(void *)*nPattern); + for(i = 0; i< nPattern; ++i) { + fPatterns[i] = (regex_t *) malloc(sizeof(regex_t)); + } + + fd = fopen(fpat_file, "r"); + + /* + if we don't prepend ^, + then pattern 'file' intended for files under srcBase + will select files with pattern = subdir/file.* + which is not our intention. + */ + i = 0; + while (!feof(fd)) { + if (fscanf(fd, "%s", pat) == 1) { + char fullpat[PATH_MAX]; + sprintf(fullpat, "^%s", pat); + + regcomp(fPatterns[i], fullpat, REG_EXTENDED|REG_NOSUB); + ++i; + } + }; + fclose(fd); + return SUCCESS; +} + +int needBackup(char * filename) /* fullpath */ +{ + /* we reach this point when the backup flag is true + if no_pattern -> backup all of them + if pattern + if match -> backup + nomatch -> no-backup + */ + int i; + char *p; + if (!backup) return FALSE; + if (nPattern==0) return TRUE; + + p = filename + strlen(pattern_baseDir) + 1; /* +1 to get pass the / after basedir */ + /* fprintf(stderr, "nPattern = %d file= %s\n", nPattern, p); ********/ + for(i=0; i + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +/* complaint_send_socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; +extern int verbose; + +int seq = 0; + +/* send buffer */ +char complaint_buffer[FLOW_BUFFSIZE]; +int *ccode_ptr; /* complain code ---- see main.h */ +int *cmid_ptr; /* which machine*/ +int *cfile_ptr; /* which file -- for missing page */ +int *npage_ptr; /* # of pages -- for missing page */ +int *pArray_ptr; /* missing page arrary */ +int *fill_ptr; /* point to next array element */ + +/* ---------------------------------------------------------------- + routines to fill in pArray with the missing page indexes + --------------------------------------------------------------- */ +void fill_in_int(int i) +{ + *fill_ptr++ = htonl(i); +} + +void init_fill_ptr() +{ + fill_ptr = pArray_ptr; +} + +/*---------------------------------------------------------- + init_complaint_sender initializes the buffer to allow the + catcher to send complaints back to the sender. + + ret_address of sender to whom we will complain + is determined when we receive the first UDP data + in read_handle_page() in page_reader.c + ----------------------------------------------------------*/ +void init_complaint_sender() /* (struct sockaddr_in *ret_addr) */ +{ + /* ret_addr is sent by master, in network-byte-order */ + if (verbose>=2) + fprintf(stderr, "in init_complaint_sender\n"); + /* init the send_socket */ + complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT); + + /* set up the pointers so we know where to put complaint_data */ + ccode_ptr = (int *) complaint_buffer; + cmid_ptr = (int *)(ccode_ptr + 1); + cfile_ptr = (int *)(cmid_ptr + 1); + npage_ptr = (int *)(cfile_ptr + 1); + pArray_ptr= (int *)(npage_ptr + 1); +} + +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin_addr); +} +#else +void update_complaint_address(struct sockaddr_in6 *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin6_addr); +} +#endif + +/*------------------------------------------------------------------------ + send_complaint fills the complaint buffer and send it through our socket + back to the sender + + The major use is to tell master machine which pages of which file + needs to be re-transmitted. + complaint -- the complain code defined in main.h + mid -- machine id + file -- the file index + npage -- # of missing pages + followed by an array of missing page index [ page_1, page_2, ... ] + + It is also used for sending back acknoledgement. + complaint -- the ack code defined in main.h in the same complaint section. + mid -- machine id + file -- which file + page -- seq number (out of seq complaints will be ignored by the catcher) + ------------------------------------------------------------------------*/ +void send_complaint(int complaint, int mid, int page, int file) +{ + /* fill in the complaint data */ + /* 20060323 add converting to network byte-order before sending out */ + int bytes; + *ccode_ptr = htonl(complaint); + *cmid_ptr = htonl(mid); + *cfile_ptr = htonl(file); + if (complaint==MISSING_PAGE || complaint==MISSING_TOTAL) { + *npage_ptr = htonl(page); + } else { + *npage_ptr = htonl(seq++); + } + + bytes = (complaint==MISSING_PAGE) ? ((char*)fill_ptr - (char*)ccode_ptr) + : (char*)pArray_ptr - (char*)ccode_ptr; + + /* send it */ + if(sendto(complaint_fd, complaint_buffer, bytes, 0, + (const struct sockaddr *)&complaint_addr, + sizeof(complaint_addr)) < 0) { + perror("Sending complaint\n"); + } + if (verbose>=2) + printf("Sent complaint:code=%d mid=%d page=%d file=%d bytes=%d\n", + complaint, mid, page, file, bytes); +} + + + + + + + + diff --git a/src/complaints.c b/src/complaints.c new file mode 100644 index 0000000..fb48da2 --- /dev/null +++ b/src/complaints.c @@ -0,0 +1,579 @@ +/* + Copyright (C) 2005-2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +/*#include "paths.h"*/ +#include +#include +#include +#include + +extern int monitorID; /* defined in multicaster.c */ +extern int my_FLOW_PORT; +extern int verbose; +extern int nPages; +extern char * cmd_name[]; +extern unsigned int total_pages, real_total_pages; +extern off_t total_bytes, real_total_bytes; + +/* buffer for receiving complaints */ +char flow_buff[FLOW_BUFFSIZE]; +int *code_ptr; /* What's wrong? */ +int *mid_ptr; /* machine id */ +int *file_ptr; /* which file */ +int *npage_ptr; /* # of pages */ +int *pArray_ptr;/* missing page arrary */ + +/* receive socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +/* status */ +char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */ +int *last_seq; /* array of size nMachines -- seq number of complaints */ + /* *** watch out the max seq = 2e9 */ +int *total_missing_page; /* array of size nMachines -- persistent thru life of program*/ +char *file_received; /* array of size nMachines */ +char *bad_machines; /* array of size nMachines -- persistent thru life of program*/ +char *machine_status; /* array of size nMachines for ack */ +int *missing_pages; /* array of size nMachines */ + +int nMachines; +int has_missing; /* some machines have missing pages for this file (a flag)*/ +int has_sick; /* some machines are sick for this file (a flag)*/ +int skip_count=0; /* number of files that are not delivered */ +int quitWithOneBad=FALSE; /* default: continue with one or more (=2) + fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE); + + /* get pointers set to the right place in buffer */ + code_ptr = (int *) flow_buff; + mid_ptr = (int *) (code_ptr + 1); + file_ptr = (int *) (mid_ptr + 1); + npage_ptr = (int *) (file_ptr + 1); + pArray_ptr= (int *) (npage_ptr+1); + + /* Receive socket (the default buffer size is 65535 bytes */ + if (verbose>=2) printf("set up receive socket for complaints\n"); + complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT); + + /* + getsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &i, &il); + printf(" rcvbuf = %d type = %d\n", i, il); + exit(0); + the default in our machines -> size = 65535 and type = 4 + */ + + rcv_size = TOTAL_REC_PAGE * FLOW_BUFFSIZE; + if (setsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer for init_complaints"); + } +} + +void init_missing_page_flag(int n) +{ + int i; + nPages = n; + if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) { + fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n); + perror("error = "); + exit(0); + } + for(i=0; i= (unsigned int) nMachines) || /* boundary check for mid_v for safety */ + (bad_machines[mid_v] == BAD_MACHINE)) { /* ignore complaint from a bad machine*/ + return 0; + } + + code_v = ntohl(*code_ptr); + file_v = ntohl(*file_ptr); + npage_v = ntohl(*npage_ptr); + + /* check if the complaint is for the current file */ + if (code_v != MONITOR_OK && file_v != current_entry()) return 0; + /* out of seq will be ignored */ + if (code_v != MISSING_PAGE && code_v != MISSING_TOTAL) { /********* MISSING_TOTAL ? *************/ + if (npage_v <= last_seq[mid_v]) return 0; + else last_seq[mid_v] = npage_v; + } + + switch (code_v) { + case PAGE_RECV: + /******** check if machineID is the one we have set. */ + /*if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);*/ + if (cmd == SENDING_DATA && mid_v == monitorID) + return 1; + else + return 0; + + case MONITOR_OK: + /********* check if machineID is the one we have set. */ + if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID); + if (cmd == SELECT_MONITOR_CMD && mid_v == monitorID) + return 1; + else + return 0; + + case OPEN_OK : + if (cmd == OPEN_FILE_CMD) { + machine_status[mid_v] = MACHINE_OK; + return 1; + } else { + return 0; + } + + case CLOSE_OK : + if (cmd == CLOSE_FILE_CMD || cmd == CLOSE_ABORT_CMD) { + machine_status[mid_v] = MACHINE_OK; + return 1; + } else { + return 0; + } + + case EOF_OK : + if (cmd == EOF_CMD && file_received[mid_v]==NOT_RECV) { + machine_status[mid_v] = MACHINE_OK; + file_received[mid_v] = FILE_RECV; + return 1; + } else { + return 0; + } + + case MISSING_PAGE : + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0; + if (npage_v > nPages) return 0; + { + int i, *pi, page_v; + pi = pArray_ptr; + for (i = 0; i nPages) continue; /*** make sure page_v starts with 1*/ + missing_page_flag[page_v-1] = MISSING; + } + } + missing_pages[mid_v] += npage_v; + set_has_missing(); + return 1; + + case MISSING_TOTAL: + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV || machine_status[mid_v] == MACHINE_OK) + return 0; + /* Consider to add: if npage_v >missing_pages[mid_v], ask to resend + [ likely no big gain ] */ + total_missing_page[mid_v] += npage_v; + set_has_missing(); /* store the info about missing info */ + machine_status[mid_v] = MACHINE_OK; /* machine_status serves as ack only */ + return 1; + + case SIT_OUT : + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0; + fprintf(stderr, "*** %s sits-out-receiving %s\n", + id2name(mid_v), getFilename()); + machine_status[mid_v] = MACHINE_OK; + + if (!has_sick) ++skip_count; + set_has_sick(); + return 1; + + default : + if (verbose>=2) fprintf(stderr, "Unknown complaint: code = %d\n", code_v); + return 0; + } /* end of switch */ + } /* end of if(readable) */ + + /* time out of readable() */ + return -1; +} + +int all_machine_ok() +{ + int i; + for(i=0; i= ACK_WAIT_PERIOD) { + ++count; + if (count < my_ACK_WAIT_TIMES) { + if (verbose>=1 && (count % 10 == 0)) + fprintf(stderr, " %d: resend cmd(%s) to machines:[ ", count, cmd_name[code]); + for(i=0; i=1 && (count % 10 == 0)) fprintf(stderr, "%d ", i); + send_cmd(code, (int) i); + usleep(FAST); + } + } + if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "]\n"); + rtime0 = rtime1; + } else { /* allowable period of time has passed */ + fprintf(stderr, " Drop these bad machines:[ "); + for(i=0; i0) { + fprintf(stderr, "\nWarning: There are %d files which are not delivered.\n", skip_count); + exit_code = -1; + } + + fprintf(stderr, "\nTotal number of files = %12d Pages w/o ack = %12u (%6.2f%%)\n", + total_entries(), pages_wo_ack(), (double)pages_wo_ack()/(double)real_total_pages*100.0); + + dp = real_total_pages - total_pages; + fprintf(stderr, "Total number of pages = %12d Pages re-sent = %12u (%6.2f%%)\n", + total_pages, dp, (double)dp/(double)total_pages*100.0); + + delta = (off_t)(real_total_bytes - total_bytes); + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "Total number of bytes = %12llu Bytes re-sent = %12llu (%6.2f%%)\n", + total_bytes, delta, (double)delta/(double)total_bytes*100.0); + #else + fprintf(stderr, "Total number of bytes = %12d Bytes re-sent = %12u (%6.2f%%)\n", + total_bytes, delta, (double)delta/(double)total_bytes*100.0); + #endif + + return (exit_code); +} + +/* count the number of bad machines */ +int nBadMachines() +{ + int i, count = 0; + for(i=0; i 0) fprintf(stderr, "]\n"); + return count; +} + +int send_done_and_pr_msgs(double total_time, double t_page) +{ + int exit_code1 =0; + int exit_code2 =0; + int exit_code3 =0; + + send_all_done_cmd(); + + /* exit_code1 !=0 if there are files that were not delivered due to change or skipped */ + exit_code1 = pr_missing_pages(); + + fprintf(stderr, "Total time spent = %6.2f (min) ~ %6.2f (min/GB)\n\n", + total_time, total_time / ((double)real_total_bytes/1.0e9)); + fprintf(stderr, "Send pages time = %6.2f (min) ~ %6.2f (min/GB)\n\n", + t_page, t_page / ((double)real_total_bytes/1.0e9)); + + exit_code2 = choose_print_machines(bad_machines, + BAD_MACHINE, + "Not synced for bad machines:[ "); + + if (quitWithOneBad && nBadMachines() >=1) { + fprintf(stderr, "We choose to exit when at least one target is bad\n"); + fprintf(stderr, "All files following the current one did not get delivered\n"); + fprintf(stderr, "If resend cmd(CLOSE_FILE), then the current file may have been delivered to non-bad targets\n\n"); + } + + if (current_entry() < total_entries()) { /* if we exit prematurely */ + exit_code3 = choose_print_machines(machine_status, + NOT_READY, "\nNot-ready machines:[ "); + } + + if (verbose>=1) pr_rtt_hist(); + return (exit_code1+exit_code3); /* 200807 removed exit_code2 because bad machines case has been dealt with + by -q. If no -q, then the bad machines are considered 'harmless' */ +} + +/* to do some cleanup before exit IF all machines are bad */ +void do_badMachines_exit() +{ + if ((quitWithOneBad && nBadMachines() < 1) || + (!quitWithOneBad && (nBadMachines() < nMachines))) return; + + if (quitWithOneBad) + fprintf(stderr, "One (or more) machine is bad. Exit!\n"); + else + fprintf(stderr, "All machines are bad. Exit!\n"); + + send_done_and_pr_msgs(-1.0, -1.0); + exit(-1); +} + +void do_cntl_c(int signo) +{ + fprintf(stderr, "Control_C interrupt detected!\n"); + + send_done_and_pr_msgs(-1.0, -1.0); + exit(-1); +} diff --git a/src/file_operations.c b/src/file_operations.c new file mode 100644 index 0000000..d9a8733 --- /dev/null +++ b/src/file_operations.c @@ -0,0 +1,800 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + This file contains all the file-operations for multicatcher. + 20060404: + found a undetected omission in open_file(). + AFter lseek(), we should write a dummy byte so that + multicatcher.zzz has the right file size to start with. + Otherwise, it will grow as syncing progresses. + + Port the code to deal with Large_files. + esp in write_page(), + lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET) + 200603: + Remove the meta-data operation. + Each file's info (stat) is transfered to targets during + the OPEN_FILE_CMD. extract_file_info() in this file + is to get that stat info for the current entry(file). + + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + Previously, memory mapped file was used for file IO + but later was changed to simple open() and lseek(), write(). + This was also echoed in a patch by Clint Byrum . + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was originally called wish_list.c + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include +#include "main.h" + +extern int verbose; +extern int machineID; + +char * baseDir = NULL; +char * missingPages=NULL; /* starting address of the array of flags */ +int fout; /* file discriptor for output file */ + +int toRmFirst = FALSE; /* remove existing file first and then sync */ +/* off_t total_bytes_written;*/ +unsigned int nPages; +int had_done_zero_page; +int current_file_id = 0; /* 1, 2, 3 ... or -1, -2 ... for backup */ +int backup; /* flag for a file that needs backup when deletion */ +char *backup_suffix = NULL; +char my_backup_suffix[] = "_mcast_bakmmddhhmm"; + +/* file stat_info which is transmitted from master */ +mode_t stat_mode; +nlink_t stat_nlink; +uid_t stat_uid; +gid_t stat_gid; +off_t stat_size; +time_t stat_atime; +time_t stat_mtime; + +char filename[PATH_MAX]; +char linktar[PATH_MAX]; +char fullpath[PATH_MAX]; +char tmp_suffix[L_tmpnam]; + +void default_suffix() +{ + time_t t; + struct tm tm; + time(&t); + localtime_r(&t, &tm); + sprintf(my_backup_suffix, "_mcast_bak%02d%02d%02d%02d", + tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min); + backup_suffix = my_backup_suffix; + return; +} + +void get_tmp_suffix() +{ + /* this is called once in each multicatcher */ + char tmp[L_tmpnam]; + tmpnam_r(&tmp[0]); + strcpy(tmp_suffix, basename(tmp)); +} + +int make_backup() +{ + char fnamebak[PATH_MAX]; + if (strlen(fullpath) + strlen(backup_suffix) > (PATH_MAX-1)) { + fprintf(stderr, "backup filename too long\n"); + return FAIL; + }; + + if (!backup) return SUCCESS; /* if not match with pattern, skip the backup */ + + sprintf(fnamebak, "%s%s", fullpath, backup_suffix); + /* + The backup scheme is as follows. + ln file file.bak (mv file file.bak would cause file to non-exist for a short while) + mv file.new file + */ + if (link(fullpath, fnamebak) != 0) { + if (errno != ENOENT && errno != EINVAL) { + fprintf(stderr,"hardlink %s => %s : %s\n",fullpath, fnamebak, strerror(errno)); + return FAIL; + } + } + if (verbose >= 2) { + fprintf(stderr, "backed up %s to %s\n",fullpath, fnamebak); + } + return SUCCESS; +} + +void get_full_path(char * dest, char * sub_path) +{ + /* prepend the sub_path with baseDir --> dest */ + strcpy(dest, baseDir); + strcat(dest, "/"); + strcat(dest, sub_path); +} + +void get_tmp_file(char * tmp) +{ + /*char *fncopy;*/ + /* ******* change to filename_mcast.fileabc%$&? */ + /* fncopy = strdup(filename); dirname change the string content */ + strcpy(tmp, baseDir); + strcat(tmp, "/"); + strcat(tmp, filename); + strcat(tmp, "_"); + strcat(tmp, TMP_FILE); + strcat(tmp, tmp_suffix); + /* free(fncopy); */ +} + +int my_unlink(const char *fn) +{ + if (verbose>=2) + fprintf(stderr, "deleting file: %s\n", fn); + if (unlink(fn) != 0) { + if (errno==ENOENT) { + return SUCCESS; + } else { + /* NOTE: unlink() could not remove files which do not have w permission!*/ + /* resort to shell command */ + char cmd[PATH_MAX]; + sprintf(cmd, "rm -f %s", fn); + if (system(cmd)!=0) { + fprintf(stderr, "'rm -f' fails for %s\n", fn); + return FAIL; + } + } + } + return SUCCESS; +} + +int my_touch(const char*fn) +{ + int fo; + if( (fo = open(fn, O_RDWR | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) { + perror(fn); + return FAIL; + } + + /* set the size of the output file */ + if(lseek(fo, 0, SEEK_SET) == -1) { + fprintf(stderr, "cannot seek for %s\n", fn); + perror(fn); + return FAIL; + } + close(fo); + return SUCCESS; +} + +int extract_file_info(char * buf, int n_file, unsigned int n_pages) +{ + /* + Along with OPEN_FILE_CMD, the data area in rec_buf contains + (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0 + where the stat_string contains the buf in + sprintf(buf, "%lu %lu %lu %lu %lu %lu %lu", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime); + */ + char * pc = &buf[0]; + + #ifdef _LARGEFILE_SOURCE + if (sscanf(pc, "%u %u %u %u %llu %lu %lu %d", &stat_mode, &stat_nlink, + &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime, + &toRmFirst) != 8) + return FAIL; + #else + if (sscanf(pc, "%u %u %u %u %lu %lu %lu %d", &stat_mode, &stat_nlink, + &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime, + &toRmFirst) != 8) + return FAIL; + #endif + + /* fprintf(stderr, "size= %llu\n", stat_size); *********/ + + pc += (strlen(pc) + 1); + strcpy(filename, pc); + get_full_path(fullpath, filename); + + linktar[0] = '\0'; + if (S_ISLNK(stat_mode) || stat_nlink > 1) { /* if it is a softlink or hardlink */ + pc += (strlen(pc) +1); + strcpy(linktar, pc); + } + + nPages = n_pages; + current_file_id = n_file; + backup = (current_file_id < 0); + had_done_zero_page = FAIL; + /*total_bytes_written = 0;*/ + return SUCCESS; +} + +int open_file() +{ + int i; + + /* + sometimes for disk space reason, it is necessary + to first remove the file and sync. + If toReplace is true, the backup option should be off. + */ + if (toRmFirst) { + if (!my_unlink(fullpath)) { + fprintf(stderr, "Replacing "); + perror(filename); + return FAIL; + } + } + + /* fprintf(stderr, "%d %d %d\n", stat_mode, stat_nlink, stat_size); *********/ + if (S_ISREG(stat_mode) && stat_nlink == 1) { + /* if it is a regular file and not a hardlink */ + char tmpFile[PATH_MAX]; + get_tmp_file(tmpFile); + + my_unlink(tmpFile); /* make sure it's not there from previous runs */ + + if( (fout = open(tmpFile, O_RDWR | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) { + fprintf(stderr, "cannot open() %s for writing. \n", tmpFile); + perror(tmpFile); + return FAIL; + } + + /* set the size of the output file (see Steve's book on page 411) */ + if(lseek(fout, stat_size - 1 , SEEK_SET) == -1) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "lseek() error for %s with size = %llu\n", tmpFile, stat_size); + #else + fprintf(stderr, "lseek() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size); + #endif + perror(tmpFile); + close(fout); + return FAIL; + } + if (write(fout, "", 1) != 1) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "write() error for %s with size = %llu\n", tmpFile, stat_size); + #else + fprintf(stderr, "write() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size); + #endif + perror(tmpFile); + close(fout); + return FAIL; + } + } + + /* init missingPages flags */ + if (!missingPages) free(missingPages); + missingPages = malloc(sizeof(char) * nPages); + for(i=0; i < nPages; ++i) missingPages[i] = MISSING; + + if (verbose>=2) fprintf(stderr, "Ready to receive file = %s\n", filename); + return SUCCESS; +} + +int close_file() +{ + /* delete missingPages flags which was malloc-ed in open_file() */ + free(missingPages); + missingPages = NULL; + + if (S_ISREG(stat_mode) && stat_nlink == 1) { + /* if it is a regular file and not a hardlink */ + char tmpFile[PATH_MAX]; + struct stat stat; + + if ((fout != -1) && (close(fout) != 0)) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n", + filename, stat_size); + #else + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n", + filename, (unsigned int)stat_size); + #endif + perror("close"); + return FAIL; + } + fout = -1; /* if the following fails, the reentry wont do munmap() */ + + /* the real work */ + /* get_full_path(oldFile, filename); **** unnecessary fullpath is the oldFile */ + + get_tmp_file(tmpFile); + + /* 8/14/2002 + If there was a hardware (disk IO) problem + the sync should not proceed. + ** Add the following checking. + */ + if (lstat(tmpFile, &stat)<0) { + perror("ERROR: close_file() cannot lstat the tmp file\n"); + return FAIL; + } + + if (backup && !make_backup(fullpath)) { /* make a hardlink oldFile => backup_file */ + fprintf(stderr, "fail to make backup for %s\n", fullpath); + return FAIL; + } + if (rename(tmpFile, fullpath)<0) { + perror("ERROR: close_file():rename() \n"); + return FAIL; + } + + /* 20071016 in rare occasion, the written file has not the right size */ + if (lstat(fullpath, &stat)<0) { + fprintf(stderr, "ERROR: close_file() cannot lstat %s\n", fullpath); + return FAIL; + } + if (stat_size != stat.st_size) { + fprintf(stderr, "ERROR: close_file() filesize != incoming-size\n"); + return FAIL; + } + } + + /* for debug + if (verbose>=2) fprintf(stderr, "total bytes written %llu for file %s\n", + total_bytes_written, filename); + */ + + return SUCCESS; +} + +int rm_tmp_file() +{ + char tmpFile[PATH_MAX]; + + if ((fout != -1) && (close(fout) != 0)) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n", + filename, stat_size); + #else + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n", + filename, (unsigned int)stat_size); + #endif + perror("rm_tmp"); + return FAIL; + } + fout = -1; + + get_tmp_file(tmpFile); + + return (my_unlink(tmpFile)); +} + +int nPages_for_file() +{ + return nPages; +}; + +/* return total number of missing pages */ +int get_missing_pages() +{ + int i, result=0; + + for(i=0; i < nPages; ++i) + if ((missingPages[i]) == MISSING) ++result; + return result; +} + +int is_missing(int index) +{ + return (missingPages[index] == MISSING) ? TRUE : FALSE; +} + +void page_received(int index) +{ + missingPages[index] = RECEIVED; +} + +/* + write() in write_page() may block forever. + This function is to check if write() is ready. +*/ +int writable(int fd) +{ + struct timeval write_tv; + fd_set wset; + FD_ZERO(&wset); + FD_SET(fd, &wset); + + write_tv.tv_sec = WRITE_WAIT_SEC; + write_tv.tv_usec = WRITE_WAIT_USEC; + return (select(fd + 1, NULL, &wset, NULL, &write_tv)==1); +} + +void write_page(int page, char *data_ptr, int bytes) +{ + /* page = page number starting with 1 */ + if (page < 1 || page > nPages) return; + + /* Do we need to write this page? */ + if (is_missing(page-1)){ + if (!writable(fout)) return; + + /* Write the data */ + if (lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET)<0) { + if (verbose>=1) { + fprintf(stderr, "ERROR: write_page():lseek() at page %d for %s\n", + page, filename); + perror("ERROR"); + } + return; + } + if (write(fout, data_ptr, bytes)<0) { + /* write IO error !!! */ + perror("ERROR"); + fprintf(stderr, "write_page():write() error: at page %d for %s\n", page, filename); + return; + } + + /* Mark the page as received in our wish list */ + page_received(page-1); + /*total_bytes_written += bytes;*/ + } else { + /* If we don't need to write it, just return */ + if (verbose >=2) { + fprintf(stderr, "Already have page %d for %d:Ignoring\n", page, current_file_id); + } + } + return; +} + +/* For files whose size is 0 */ +int touch_file() +{ + if (verbose >=2) + fprintf(stderr, "touching file: %s\n", fullpath); + + my_unlink(fullpath); + return my_touch(fullpath); + /* system() + VERY time in-efficient + char cmd[PATH_MAX]; + sprintf(cmd, "touch %s", fullpath); + return (system(cmd)==0); + */ +} + +int delete_file(int to_check_dir_type) +{ + struct stat st; + char fp[PATH_MAX]; + int trailing_slash; + int type_checking; + + strcpy(fp, fullpath); + + /* remove trailing slash if any -- for deletion-'type' checking */ + if (to_check_dir_type) { + char *pc; + type_checking = TRUE; + pc = &fp[0] + strlen(fp) - 1; + if (*pc=='/') { + *pc = '\0'; + trailing_slash = TRUE; + } else { + trailing_slash = FALSE; + }; + } else { + type_checking = FALSE; + } + + if(lstat(fp, &st) < 0) { + /* already gone ? */ + return SUCCESS; + } + if (S_ISREG(st.st_mode) || S_ISLNK(st.st_mode)) { /* delete a file or link */ + if (verbose>=2) + fprintf(stderr, "deleting file: %s\n", fp); + + if (type_checking && trailing_slash) { /* intended to remove a directory when it is not */ + return FAIL; + } + + if (backup && S_ISREG(st.st_mode) && !make_backup(fp)) {/* backup regular file */ + return FAIL; /* failed to make_backup */ + } + return (my_unlink(fp)); + } else if (S_ISDIR(st.st_mode)) { /* remove a directory */ + char cmd[PATH_MAX]; + if (verbose>=2) + fprintf(stderr, "deleting directory: %s\n", fp); + + if (type_checking && (!trailing_slash)) { /* intended to remove a non-dir when it is directory */ + return FAIL; + } + + sprintf(cmd, "rm -rf %s", fp); /* remove everything in dir, watch out for this */ + return (system(cmd)==0); + } + /* not file, link, directory */ + fprintf(stderr, "unrecognized file_mode for %s\n", fp); + return FAIL; +} + +/* send complaints to the master for missing data */ +int ask_for_missing_page() +{ + int i, n=0, total=0; + + /* + Send missing page indexes if any + */ + init_fill_ptr(); + for(i=0; i < nPages; ++i) { + if (missingPages[i] == MISSING ) { + ++n; + ++total; + if (n > MAX_NPAGE) { + /* send previous missing page-indexes */ + send_complaint(MISSING_PAGE, machineID, MAX_NPAGE, current_file_id); + init_fill_ptr(); + n = 1; + } + /* fill in one page index */ + fill_in_int(i+1); /* origin = 1 */ + } + } + /* send the rest of missing pages complaint */ + if (n>0) send_complaint(MISSING_PAGE, machineID, n, current_file_id); + + return total; /* there are missing pages */ +} + +void missing_page_stat() +{ + int i, n=0, sum=0, last=-1; + + for(i=0; i < nPages; ++i) { + if (missingPages[i] == MISSING ) { + ++n; + if (last<0) { + sum += i; + } else { + sum += (i - last); + } + last = i; + } + } + if (n>0) { + double a = sum; + double b = n; + fprintf(stderr, "file= %d miss= %d out-of %d avg(delta_index) = %f\n", + current_file_id, n, nPages, a/b); + } +} + +void my_perror(char * msg) +{ + char fn[PATH_MAX]; + sprintf(fn, "%s - %s", fullpath, msg); + perror(fn); +} + +int set_owner_perm_times() +{ + int state = SUCCESS; + + /* set owner */ + if (lchown(fullpath, stat_uid, stat_gid)!=0) { + my_perror("chown"); + state = FAIL; + } + + /* + set time and permission. + Don't try to set the time and permission on a link + */ + if (!S_ISLNK(stat_mode)) { + struct utimbuf times; + if (chmod(fullpath, stat_mode)!=0) { + my_perror("chmod"); + state = FAIL; + } + + times.actime = stat_atime; + times.modtime = stat_mtime; + if (utime(fullpath, ×)!=0) { + my_perror("utime"); + state = FAIL; + } + } + return state; +} + +int update_directory() +{ + struct stat st; + int exists; + char fp[PATH_MAX], *pc; + + if (verbose>=2) + fprintf(stderr, "Updating dir: %s\n", fullpath); + + /* if fullpath is a softlink that points to a dir, + and it has a trailing '/', + lstat() will view it as a directory ! + So, we remove the trailing '/' before lstat() */ + strcpy(fp, fullpath); + pc = &fp[0] + strlen(fp) - 1; + if (*pc=='/') *pc = '\0'; + + if(lstat(fp, &st) < 0) { + switch(errno) { + case ENOENT: + exists = FALSE; + break; + default: + my_perror("lstat"); + return FAIL; + } + } else { + exists = TRUE; + } + + if (!exists) { + /* There's nothing there, so create dir */ + if (mkdir(fp, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + } else if (!S_ISDIR(st.st_mode)) { + /* If not a directory delete what is there */ + if (unlink(fp)!=0){ + my_perror("unlink"); + return FAIL; + } + if (verbose>=2) + fprintf(stderr, "Deleted file %s to replace with directory\n", fp); + if (mkdir(fp, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + } else { + /* If dir exists, just chmod */ + chmod(fp, stat_mode); + /*** 20070410: changing mtime of a dir can cause NFS to confuse + See http://lists.samba.org/archive/rsync/2004-May/009439.html + So, I comment it out in the following + + struct utimbuf times; + times.actime = stat_atime; + times.modtime = stat_mtime; + ***/ + /* + if (utime(fullpath, ×) < 0) { + my_perror("utime"); + } + */ + return SUCCESS; + } +} + +int update_directory0() +{ + DIR *d; + + struct utimbuf times; + times.actime = stat_atime; + times.modtime = stat_mtime; + + if (verbose>=2) + fprintf(stderr, "Creating dir: %s\n", fullpath); + + d = opendir(fullpath); + if (d == NULL) { + switch(errno) { + case ENOTDIR: + /* If not a directory delete what is there */ + if (unlink(fullpath)!=0){ + my_perror("unlink"); + return FAIL; + } + if (verbose>=2) + fprintf(stderr, "Deleted file %s to replace with directory\n", fullpath); + /* Fall through to ENOENT */ + case ENOENT: + /* There's nothing there, so create dir */ + if (mkdir(fullpath, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + default: + my_perror("opendir"); + return FAIL; + } + } else { + /* If dir exists, just chmod */ + closedir(d); + chmod(fullpath, stat_mode); + /*** 20070410: changing mtime of a dir can cause NFS to confuse + See http://lists.samba.org/archive/rsync/2004-May/009439.html + So, I comment it out in the following ***/ + /* + if (utime(fullpath, ×) < 0) { + my_perror("utime"); + } + */ + return SUCCESS; + } +} + +int check_zero_page_entry() +{ + /* when total_pages = 0, the entry can be + an empty file + softlink (hardlink), + a directory + */ + if (had_done_zero_page) return SUCCESS; /* to avoid doing it again */ + + if (S_ISDIR(stat_mode)) { /* a directory */ + if (!update_directory()) { + had_done_zero_page = FAIL; + return FAIL; + } + } else if (S_ISLNK(stat_mode)) { /* Is it a softlink? */ + if (verbose>=2) + fprintf(stderr, "Making softlink: %s -> %s\n", fullpath, linktar); + delete_file(FALSE); /* remove the old one at fullpath */ + if (symlink(linktar, fullpath) < 0) { + my_perror("symlink"); + had_done_zero_page = FAIL; + return FAIL; + } + } else if (stat_nlink > 1) { /* hardlink */ + char fn[PATH_MAX]; + get_full_path(fn, linktar); /* linktar is a relative path from synclist */ + if (verbose>=2) + fprintf(stderr, "Making a hardlink: %s => %s\n", fullpath, fn); + my_unlink(fullpath); /* remove the old one */ + if (link(fn, fullpath)!=0) { + my_perror("link"); + had_done_zero_page = FAIL; + return FAIL; + } + } else { + /* it must be a regular file */ + + if (!touch_file()) { + had_done_zero_page = FAIL; + return FAIL; + } else { + set_owner_perm_times(); + } + } + had_done_zero_page = SUCCESS; + return SUCCESS; +} + diff --git a/src/global.c b/src/global.c new file mode 100644 index 0000000..64104eb --- /dev/null +++ b/src/global.c @@ -0,0 +1,35 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +char *cmd_name[] = { "TIME-OUT", + "TEST", + "SEND_DATA", + "RESEND_DATA", + "OPEN_FILE", + "EOF", + "CLOSE_FILE", + "CLOSE_ABORT", + "ALL_DONE", + "SEL_MONITOR", + "NULL"}; + +int verbose = 0; /* = 0 little detailed output, = 1,2 ... = n a lot more details */ +int machineID = -1; /* used for multicatcher */ + diff --git a/src/id_map.c b/src/id_map.c new file mode 100644 index 0000000..8b8dcaa --- /dev/null +++ b/src/id_map.c @@ -0,0 +1,74 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include +#include +#include +#include + +void strip(char * str); + +/* place to hold the array of string */ +char ** machine_names = NULL; /* array of (char*) */ +int nTargets=0; + +void get_machine_names(char * filename) +{ + FILE *fd; + char line[PATH_MAX]; + int count=0; + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + return; + } + while (fgets(line, PATH_MAX, fd) != NULL) { + strip(line); + if (strlen(line) != 0) ++count; + } + if (count == 0) { + fclose(fd); + fprintf(stderr, "No machine names in the file = %s\n", filename); + return; + } + + nTargets = count; + + rewind(fd); + machine_names = malloc(nTargets * sizeof(void*)); + + line[0] = '\0'; + count = 0; + while(fgets(line, PATH_MAX, fd) != NULL) { + strip(line); + if (strlen(line)==0) continue; + machine_names[count] = (char*)strdup(line); + line[0] = '\0'; + ++count; + } + fclose(fd); +} + +char * id2name(int id) +{ + return (machine_names) ? machine_names[id] : ""; +} diff --git a/src/main.h b/src/main.h new file mode 100644 index 0000000..48f8c55 --- /dev/null +++ b/src/main.h @@ -0,0 +1,189 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + Following the suggestion and the patch by Clint Byrum , + I added more control to selectively print out messages. + The control is done by the statement 'if (version >= n)' + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_h +#define __main_h + +#include +#include +#include +#include +#include +#include +#include +#include +#include /* sockaddr_in{} and other Internet defns */ +#include /* inet(3) functions */ +#include +#include /* for nonblocking */ +#include +#include +#include +#include +#include +#include /* for S_xxx file mode constants */ +#include /* for iovec{} and readv/writev */ +#include +#include +#include /* timeval{} for select() */ +#include +#include + +#define VERSION "4.0.1" + +/* logic values */ +#define FALSE 0 +#define TRUE 1 +#define FAIL (FALSE) +#define SUCCESS (TRUE) +#define GOOD_EXIT 0 +#define BAD_EXIT -1 + +/* Ports and addresses */ +#define PORT 8888 /* for multicast */ +#define FLOW_PORT (PORT-1) /* for flow-control */ +#define MCAST_ADDR "239.255.67.92" +#define MCAST_TTL 1 +#define MCAST_LOOP FALSE +#define MCAST_IF NULL + +/* + Handling socket's receive buffer on the target machine: + if the available data size in the receiveing buffer is larger + than TOO_MUCH then a TOO_FAST complaint is triggered. + The master will then sleep for USEC_TO_IDLE + Currently, this is not effective. +*/ +#define TOO_FAST_LIMIT (TOTAL_REC_PAGE / 2) /* if half is full, then too fast */ +#define TOO_MUCH (TOO_FAST_LIMIT * PAGE_BUFFSIZE) +#define USEC_TO_IDLE 1000000 + +/* TIMING stuff */ +#define FAST 5000 /* usec */ +#define DT_PERPAGE 6000 /* usec */ +#define FACTOR 90 /* interpage interval = FACTOR * DT_PERPAGE or DT_PERPAGE*/ +#define SECS_FOR_KILL 30 /* time(sec) allowed for 'kill -9 pid' to finish */ + +/* time for the master to wait for the acknowledgement */ +#define ACK_WAIT_PERIOD 1 /* secs (from time()); */ +#define ACK_WAIT_TIMES 60 /* wait for this many periods */ + +#define SICK_RATIO (0.9) +#define SICK_THRESHOLD (50) /* SICK FOR such many TIMES is really sick */ + +/* max wait time for write() a page of PAGE_SIZE -- 100 msec */ +#define WRITE_WAIT_SEC 0 +#define WRITE_WAIT_USEC 100000 + +#define SET_MON_WAIT_TIMES 6000 /* time = this number * FAST */ +#define NO_FEEDBACK_COUNT_MAX 10 +#define SWITCH_THRESHOLD 50 /* to avoid switching monitor too frequently + because of small diff in missing_pages */ + +/* complaints */ +#define TOO_FAST 100 +#define OPEN_OK 200 +#define CLOSE_OK 300 +#define MISSING_PAGE 400 +#define MISSING_TOTAL 500 +#define EOF_OK 600 +#define SIT_OUT 700 +#define PAGE_RECV 800 +#define MONITOR_OK 900 + +/* Sizes */ +/* 20060427: removed size_t which is arch-dependent */ +#define PAGE_SIZE 64512 +#define HEAD_SIZE (sizeof(int) + 2 * sizeof(int) + 2 * sizeof(int)) +#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE) +#define TOTAL_REC_PAGE 20 /* change to 4 in case hit the OS limit in buf size */ + +#define FLOW_HEAD_SIZE (sizeof(int)*4) +#define FLOW_BUFFSIZE (PAGE_SIZE+FLOW_HEAD_SIZE) +#define MAX_NPAGE (PAGE_SIZE / sizeof(int)) +/* + Modes and command codes: + The numerical codes are also the index to retrieve the command names + for printing in complaints.c +*/ +#define TIMED_OUT 0 +#define TEST 1 +#define SENDING_DATA 2 +#define RESENDING_DATA 3 +#define OPEN_FILE_CMD 4 +#define EOF_CMD 5 +#define CLOSE_FILE_CMD 6 +#define CLOSE_ABORT_CMD 7 +#define ALL_DONE_CMD 8 +#define SELECT_MONITOR_CMD 9 +#define NULL_CMD 10 + +/* machine status ----- for caster */ +#define MACHINE_OK_MISSING_PAGES '\2' +#define MACHINE_OK '\1' +#define NOT_READY '\0' + +#define BAD_MACHINE '\1' +#define GOOD_MACHINE '\0' + +#define FILE_RECV '\1' +#define NOT_RECV '\0' + +/* representation of all-targets for sends */ +#define ALL_MACHINES -1 + +/* PAGE STATUS */ +#define MISSING '\0' +#define RECEIVED '\1' + +/* MACHINE STATE ----- for catcher */ +#define IDLE_STATE 0 +#define GET_DATA_STATE 1 +#define DATA_READY_STATE 2 +#define SICK_STATE 3 + +/* + The following two are info to be packed into + meta data to represent either file or directory deletion. +*/ +/* SPECIAL # of PAGES to signal deleting action */ +#define TO_DELETE (-1) + +/* temporary file name prefix for transfering to */ +#define TMP_FILE "mrsync." + +#include "proto.h" + +#endif diff --git a/src/main.h.in b/src/main.h.in new file mode 100644 index 0000000..600c491 --- /dev/null +++ b/src/main.h.in @@ -0,0 +1,189 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + Following the suggestion and the patch by Clint Byrum , + I added more control to selectively print out messages. + The control is done by the statement 'if (version >= n)' + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_h +#define __main_h + +#include +#include +#include +#include +#include +#include +#include +#include +#include /* sockaddr_in{} and other Internet defns */ +#include /* inet(3) functions */ +#include +#include /* for nonblocking */ +#include +#include +#include +#include +#include +#include /* for S_xxx file mode constants */ +#include /* for iovec{} and readv/writev */ +#include +#include +#include /* timeval{} for select() */ +#include +#include + +#define VERSION "@VERSION@" + +/* logic values */ +#define FALSE 0 +#define TRUE 1 +#define FAIL (FALSE) +#define SUCCESS (TRUE) +#define GOOD_EXIT 0 +#define BAD_EXIT -1 + +/* Ports and addresses */ +#define PORT 8888 /* for multicast */ +#define FLOW_PORT (PORT-1) /* for flow-control */ +#define MCAST_ADDR "239.255.67.92" +#define MCAST_TTL 1 +#define MCAST_LOOP FALSE +#define MCAST_IF NULL + +/* + Handling socket's receive buffer on the target machine: + if the available data size in the receiveing buffer is larger + than TOO_MUCH then a TOO_FAST complaint is triggered. + The master will then sleep for USEC_TO_IDLE + Currently, this is not effective. +*/ +#define TOO_FAST_LIMIT (TOTAL_REC_PAGE / 2) /* if half is full, then too fast */ +#define TOO_MUCH (TOO_FAST_LIMIT * PAGE_BUFFSIZE) +#define USEC_TO_IDLE 1000000 + +/* TIMING stuff */ +#define FAST 5000 /* usec */ +#define DT_PERPAGE 6000 /* usec */ +#define FACTOR 90 /* interpage interval = FACTOR * DT_PERPAGE or DT_PERPAGE*/ +#define SECS_FOR_KILL 30 /* time(sec) allowed for 'kill -9 pid' to finish */ + +/* time for the master to wait for the acknowledgement */ +#define ACK_WAIT_PERIOD 1 /* secs (from time()); */ +#define ACK_WAIT_TIMES 60 /* wait for this many periods */ + +#define SICK_RATIO (0.9) +#define SICK_THRESHOLD (50) /* SICK FOR such many TIMES is really sick */ + +/* max wait time for write() a page of PAGE_SIZE -- 100 msec */ +#define WRITE_WAIT_SEC 0 +#define WRITE_WAIT_USEC 100000 + +#define SET_MON_WAIT_TIMES 6000 /* time = this number * FAST */ +#define NO_FEEDBACK_COUNT_MAX 10 +#define SWITCH_THRESHOLD 50 /* to avoid switching monitor too frequently + because of small diff in missing_pages */ + +/* complaints */ +#define TOO_FAST 100 +#define OPEN_OK 200 +#define CLOSE_OK 300 +#define MISSING_PAGE 400 +#define MISSING_TOTAL 500 +#define EOF_OK 600 +#define SIT_OUT 700 +#define PAGE_RECV 800 +#define MONITOR_OK 900 + +/* Sizes */ +/* 20060427: removed size_t which is arch-dependent */ +#define PAGE_SIZE 64512 +#define HEAD_SIZE (sizeof(int) + 2 * sizeof(int) + 2 * sizeof(int)) +#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE) +#define TOTAL_REC_PAGE 20 /* change to 4 in case hit the OS limit in buf size */ + +#define FLOW_HEAD_SIZE (sizeof(int)*4) +#define FLOW_BUFFSIZE (PAGE_SIZE+FLOW_HEAD_SIZE) +#define MAX_NPAGE (PAGE_SIZE / sizeof(int)) +/* + Modes and command codes: + The numerical codes are also the index to retrieve the command names + for printing in complaints.c +*/ +#define TIMED_OUT 0 +#define TEST 1 +#define SENDING_DATA 2 +#define RESENDING_DATA 3 +#define OPEN_FILE_CMD 4 +#define EOF_CMD 5 +#define CLOSE_FILE_CMD 6 +#define CLOSE_ABORT_CMD 7 +#define ALL_DONE_CMD 8 +#define SELECT_MONITOR_CMD 9 +#define NULL_CMD 10 + +/* machine status ----- for caster */ +#define MACHINE_OK_MISSING_PAGES '\2' +#define MACHINE_OK '\1' +#define NOT_READY '\0' + +#define BAD_MACHINE '\1' +#define GOOD_MACHINE '\0' + +#define FILE_RECV '\1' +#define NOT_RECV '\0' + +/* representation of all-targets for sends */ +#define ALL_MACHINES -1 + +/* PAGE STATUS */ +#define MISSING '\0' +#define RECEIVED '\1' + +/* MACHINE STATE ----- for catcher */ +#define IDLE_STATE 0 +#define GET_DATA_STATE 1 +#define DATA_READY_STATE 2 +#define SICK_STATE 3 + +/* + The following two are info to be packed into + meta data to represent either file or directory deletion. +*/ +/* SPECIAL # of PAGES to signal deleting action */ +#define TO_DELETE (-1) + +/* temporary file name prefix for transfering to */ +#define TMP_FILE "mrsync." + +#include "proto.h" + +#endif diff --git a/src/multicaster.c b/src/multicaster.c new file mode 100644 index 0000000..3b26cae --- /dev/null +++ b/src/multicaster.c @@ -0,0 +1,582 @@ +/* + Copyright (C) 2006-2008 Renaissance Technologies Corp. + main developer: HP Wei + version 3.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + -- catching slow machine as the feedback monitor + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64 bit arch by Nicolas Marot in France + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() + version 4.0 major update + -- consolidate sending missing pages in complaint flow + cutting the messages by one order magnitude + -- exit code adjustment + + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#include /* to define PATH_MAX */ +#include +#include +#include + +extern int verbose; +extern char * machine_list_file; /* defined in complaints.c */ +extern char * bad_machines; /* array of size nMachines, defined in complaints.c */ +extern char * file_received; +extern int * missing_pages; +extern int backup; +extern int nPattern; +extern char * pattern_baseDir; +extern int nTargets; +extern int my_ACK_WAIT_TIMES; +extern int toRmFirst; +extern int quitWithOneBad; +extern int skip_count; +extern int file_changed; + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +int my_TTL = MCAST_TTL; +int my_LOOP = MCAST_LOOP; +char * my_IFname = MCAST_IF; + +int monitorID = -1; + +int no_feedback_count; + +void usage() +{ + fprintf(stderr, + "multicaster (to copy files to many multicatchers) version %s)\n" + " Option list:\n" + " [ -v ]\n" + " [ -w \n" + " -s \n" + " -f \n" + " -------- options for backup ---------------------------------\n" + " [ -b flag to turn on backup ]\n" + " [ -r for regex patterns for files needing backup ]\n" + " [ -d for regex patterns ]\n" + " -------- mcast options --------------------------------------\n" + " [ -A **same as for multicatcher ]\n" + " [ -P **same as for multicatcher ]\n" + " [ -T ]\n" + " [ -L flag turn on mcast_LOOP. default is off ]\n" + " [ -I ]\n", + VERSION, my_ACK_WAIT_TIMES, MCAST_ADDR, PORT, my_TTL); +} + + +int monitor_cmd(int cmd, int machineID) +{ + /* + Once this fx is called the old monitor will be + turned off. So, we need to make sure this fx + returns TRUE for a machine. Or the monitor_set_up + should fail. + */ + int count =0, resp; + set_delay(0, FAST); + send_cmd(cmd, machineID); + while (1) { /* wait for ack */ + resp = read_handle_complaint(cmd); + if (resp<0) { /* time-out */ + ++count; + send_cmd(cmd, machineID); + if (count > SET_MON_WAIT_TIMES) { + return FALSE; + } + } else if (resp==1) { /* got ack */ + return TRUE; + } + /* irrevelant resp -- continue */ + } +} + +int find_max_missing_machine(char *flags) +{ + /* flags[] -> which machines are not considered */ + int i, index = -1; + int max = -1; + int threshold; + + for(i=0; i max && flags[i] == GOOD_MACHINE) { + max = missing; + index = i; + } + } + + threshold = (get_nPages() > SWITCH_THRESHOLD*100) ? + get_nPages() / 100 : SWITCH_THRESHOLD; + if (index>=0) { + if (monitorID>=0) { + if (flags[monitorID]==BAD_MACHINE) return index; + /** if (max <= (total_missing_page[monitorID] + threshold)) **/ + if (max <= (missing_pages[monitorID] + threshold)) + return monitorID; + } + return index; + } else { /* could come here if all are busy*/ + return -1; + } + + /** + return ((index >= 0) && + (monitorID >= 0) && + (bad_machines[monitorID] == GOOD_MACHINE) && + (max <= (total_missing_page[monitorID] + threshold))) ? + monitorID : index; + **/ +} + +void set_monitor(int mid) +{ + /* one machine is to be set. So need to succeed. */ + if (monitor_cmd(SELECT_MONITOR_CMD, mid)) { + if (verbose >=1) fprintf(stderr, "Monitor - %s\n", id2name(mid)); + return; + } else { + fprintf(stderr, "Fatal: monitor %s cannot be set up!\n", id2name(mid)); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void check_change_monitor(int undesired_index) +{ + /* this function changes the value of the global var: monitorID */ + int i, count; + char * flags; + + /* if all targets received the file, no need to go on */ + if ((count=nNotRecv())==0) return; + + if (count==1) { + i = iNotRecv(); + if (bad_machines[i] == GOOD_MACHINE) { + monitorID = i; + /* + 'i' could be the current monitor. + We'd like to set it because there might + be something wrong with it if we come to + to this point. + */ + set_monitor(monitorID); + } + return; + } + + /* more than two machines do not receive the file yet */ + /* flags mark those machines as BAD which we don't want to consider */ + flags = malloc(nTargets * sizeof(char)); + for(i=0; i=1) fprintf(stderr, "Monitor = %s\n", id2name(monitorID)); + break; + } else { + flags[monitorID] = BAD_MACHINE; + /* Then, we attemp to set up other machine */ + } + ++count; + } + + free(flags); + if (monitorID < 0) { + fprintf(stderr, "Fatal: monitor machine cannot be set up!\n"); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void do_one_page(int page) +{ + int resp; + unsigned long rtt; + refresh_timer(); + start_timer(); + if (!send_page(page)) return; + + /* first ignore all irrelevant resp */ + resp=read_handle_complaint(SENDING_DATA); + while (resp==0) { + resp=read_handle_complaint(SENDING_DATA); + } + + /* read_handle_complaint() waits n*interpage_interval at most */ + if (resp==-1) { + /* delay_sec for readable() is set by set_delay() */ + /****** + at this point, the readable() returns without getting a reply + from monitorID after FACTOR*DT_PERPAGE (or DT_PERPAGE if without_monitor) + ****/ + ++no_feedback_count; + if (verbose>=2) printf("no reply, count = %d\n", no_feedback_count); + update_rtt_hist(999999); + /* register this page as rtt = infinite --- the last element in rtt_hist */ + + if (no_feedback_count > NO_FEEDBACK_COUNT_MAX) { + /* switch to another client */ + if (verbose >=2) + fprintf(stderr, + "Consecutive non_feedback exceeds limit, Changing monitor machine.\n"); + /* if (nTargets>1 && (nTargets - nBadMachines()) > 1 && nNotRecv() > 1) */ + check_change_monitor(monitorID); /* replace the current monitor */ + no_feedback_count = 0; + } + return; + } else { /* resp == 1 */ + end_timer(); + update_time_accumulator(); + rtt = get_accumulated_usec(); + /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ + /* to do: update histogram */ + if (verbose >=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); + update_rtt_hist(rtt); + + no_feedback_count = 0; + } +} + +void send_cmd_and_wait_ack(int cmd_code) +{ + send_cmd(cmd_code, (int) ALL_MACHINES); + refresh_machine_status(); + /*set_delay(0, FAST);*/ + set_delay(0, DT_PERPAGE*FACTOR); + if (cmd_code==EOF_CMD) mod_machine_status(); + wait_for_ok(cmd_code); + do_badMachines_exit(); + /* check_change_monitor(-1); */ +} + +int do_file_changed_skip() +{ + /* if file is changed during syncing, then we should skip this file */ + if (file_changed || !same_stat_for_file()) { + fprintf(stderr, "WARNING: file is changed during sycing -- skipping\n"); + send_cmd_and_wait_ack(CLOSE_ABORT_CMD); + free_missing_page_flag(); + ++skip_count; + return TRUE; + } + return FALSE; +} + +int main(int argc, char *argv[]) +{ + int c; + int cfile, ctotal_pages, cpage; + char * source_path = NULL; + char * synclist_path = NULL; + char * machine_list_file = NULL; + time_t tloc; + time_t time0, time1, t_page0, t_page; + + while ((c = getopt(argc, argv, "v:w:A:P:T:LI:m:s:f:br:d:Xq")) != EOF) { + switch (c) { + case 'v': + verbose = atoi(optarg); + break; + case 'w': + my_ACK_WAIT_TIMES = atoi(optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'T': + my_TTL = atoi(optarg); + break; + case 'L': + my_LOOP = TRUE; + break; + case 'I': + my_IFname = optarg; + break; + case 'm': + machine_list_file = optarg; + break; + case 's': + source_path = optarg; + break; + case 'f': + synclist_path = optarg; + break; + case 'b': + backup = TRUE; /* if nPattern==0, backup means back up ALL files */ + break; + case 'r': /* to selectively back up certain files as defined in the pattern */ + if (!read_backup_pattern(optarg)) { + fprintf(stderr, "Failed in loading regex patterns in file = %s\n", optarg); + exit(BAD_EXIT); + } + break; + case 'd': + pattern_baseDir = strdup(optarg); + if (pattern_baseDir[strlen(pattern_baseDir)-1]=='/') + pattern_baseDir[strlen(pattern_baseDir)-1] = '\0' ; /* remove last / */ + break; + case 'X': + toRmFirst = TRUE; + break; + case 'q': + quitWithOneBad = TRUE; + break; + case '?': + usage(); + exit(BAD_EXIT); + } + } + + if (!machine_list_file || !source_path || !synclist_path ) { + fprintf(stderr, "Essential options (-m -s -f) should be specified. \n"); + usage(); + exit(BAD_EXIT); + } + + if (nPattern>0) backup = TRUE; + if (backup && nPattern>0) { + if (!pattern_baseDir) pattern_baseDir = strdup(source_path); + if (strlen(source_path) < strlen(pattern_baseDir) || + strncmp(source_path, pattern_baseDir, strlen(pattern_baseDir))!=0) { + fprintf(stderr, + "src_path (%s) should include (and be longer than) pattern_baseDir (%s)", + source_path, pattern_baseDir); + exit(BAD_EXIT); + } + } + + if (backup && toRmFirst) { + fprintf(stderr, "-B and -X cannot co-exist\n"); + exit(BAD_EXIT); + } + + get_machine_names(machine_list_file); + if (nTargets==0) { + fprintf(stderr, "No target to sync to\n"); + exit(GOOD_EXIT); + } + + if (!init_synclist(synclist_path, source_path)) exit(BAD_EXIT); + + if (total_entries()==0) { + fprintf(stderr, "Nothing to sync in %s\n", synclist_path); + exit(GOOD_EXIT); + } + + if (verbose >= 2) + fprintf(stderr, "Total number of files: %d\n", total_entries()); + + /* init the network stuff and some flags */ + init_sends(); + init_complaints(); + init_machine_status(nTargets); + + /* set up Cntl_C catcher */ + Signal(SIGINT, do_cntl_c); + + /* ------------------- set up monitor machine for doing feedback for each page sent */ + check_change_monitor(-1); + + /*-------------------------------------------------------------------------------*/ + + init_rtt_hist(); + time0 = time(&tloc); /* start time */ + t_page = 0; /* total time for sending pages */ + + /* -----------------------------Send the file one by one -----------------------------------*/ + for (cfile = 1; cfile <= total_entries(); cfile++) { /* for each file to be synced */ + if (!get_next_entry(cfile)) continue; + + ctotal_pages = pages_for_file(); + + /* + By the time this file, which was obtained when synclist was + established some time ago, may no longer exist on the master. + So, we need to check the existence of this file. + fexist() also opens the file so that it won't be deleted + between here and the send-page-loop. + */ + if (ctotal_pages > 0 && (!same_stat_for_file() || + !fexist(current_entry()))) { + /* go to next file if this file has changed or does not exist */ + fprintf(stderr, "%s (%d out of %d; Extinct file)\n", + getFilename(), current_entry(), total_entries()); + adjust_totals(); + continue; + } + + if (ctotal_pages < 0) { + fprintf(stderr, "%s (%d out of %d; to delete)\n", + getFilename(), current_entry(), total_entries()); + } else { + fprintf(stderr, "%s (%d out of %d; %d pages)\n", + getFilename(), current_entry(), total_entries(), ctotal_pages); + } + + /* send_open_cmd */ + pack_open_file_info(); + send_cmd_and_wait_ack(OPEN_FILE_CMD); + + /* + ctotal_pages < 0, for deletion + ctotal_pages = 0, regular file with no content. + or directory, softlink, hardlink + both should have been finished with OPEN_FILE_CMD + */ + if (ctotal_pages <= 0) continue; + + /* for other regular files */ + init_missing_page_flag(ctotal_pages); + refresh_missing_pages(); /* total missing pages for this file for each tar */ + + /* ----- sending file data ----- first round */ + t_page0 = time(&tloc); + no_feedback_count = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++) { + /* + the mode field and delay may be changed by change_monitor + */ + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(SENDING_DATA); + do_one_page(cpage); + } + + if (do_file_changed_skip()) continue; + + /* send "I am done with the first round" */ + reset_has_missing(); + refresh_file_received(); /* to record machines that have received this file */ + send_cmd_and_wait_ack(EOF_CMD); + + /* after the first run, before we go to 2nd and 3rd run, */ + if (has_missing_pages()) check_change_monitor(-1); + + /* ----- sending file data again, 2nd and 3rd and ...n-th round */ + reset_has_sick(); + while (has_missing_pages()) { + int c; /****************/ + no_feedback_count = 0; + + c = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++){ + if (is_it_missing(cpage-1)) { + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(RESENDING_DATA); + do_one_page(cpage); + page_sent(cpage-1); + ++c; /*************/ + } + } + if (verbose>=1) + fprintf(stderr, "re-sent N_pages = %d\n", c); /*************/ + + /* eof */ + reset_has_missing(); + send_cmd_and_wait_ack(EOF_CMD); + if (has_sick_machines()) { + break; + /* one machine can reach sick_state while some others are still + in missing_page state. + This break here is ok in terms of skipping this file.*/ + } else { + check_change_monitor(-1); + } + }; + + t_page += (time(&tloc) - t_page0);; + if (do_file_changed_skip()) continue; + + /* close file */ + send_cmd_and_wait_ack((has_sick_machines()) ? CLOSE_ABORT_CMD : CLOSE_FILE_CMD); + if (has_sick_machines()) { + fprintf(stderr, "Skip_syncing %s\n", getFilename()); + } + free_missing_page_flag(); + } /* end of the for each_file loop */ + + time1= time(&tloc); + return send_done_and_pr_msgs( ((double)(time1 - time0))/ 60.0, ((double)t_page)/60.0); +} + diff --git a/src/multicatcher.c b/src/multicatcher.c new file mode 100644 index 0000000..76fbc5e --- /dev/null +++ b/src/multicatcher.c @@ -0,0 +1,181 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + verision 3.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + -- catching slow machine as the feedback monitor + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64 bit arch by Nicolas Marot in France + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() + + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +extern int machineID; +extern int verbose; +extern int isMonitor; +extern char * baseDir; +extern char * cmd_name[]; +extern char * backup_suffix; + +char * my_MCAST_ADDR = MCAST_ADDR; +char * my_IFname = MCAST_IF; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; + +void usage() +{ + fprintf(stderr, + "multicatcher (to receive files from multicaster) version %s\n" + " Option list:\n" + " [ -v ]\n" + " -------- essential options ----------------------------------\n" + " -t \n" + " -i \n" + " -------- options for backup ---------------------------------\n" + " [ -u for backup files if -b is on in multicaster ]\n" + " -------- mcast options --------------------------------------\n" + " [ -A **same as for multicaster ]\n" + " [ -P **same as for multicaster ]\n" + " [ -I ]\n", + VERSION, MCAST_ADDR, PORT); +} + +int main(int argc, char *argv[]) +{ + int old_mode; /* hp: from char to int for mode */ + int mode; + int c; + + while ((c = getopt(argc, argv, "v:A:P:t:i:u:I:")) != EOF) { + switch (c) { + case 'v': + verbose = atoi(optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'I': + my_IFname = optarg; + break; + case 't': + baseDir = optarg; + break; + case 'i': + machineID = atoi(optarg); + break; + case 'u': + backup_suffix = strdup(optarg); + break; + case '?': + usage(); + exit(BAD_EXIT); + } + } + + if (machineID < 0 || !baseDir) { + fprintf(stderr, "Essential options (-t -i) should be specified. \n"); + usage(); + exit(BAD_EXIT); + } + + fprintf(stderr, "my_pid= %lu\n", getpid()); + + if (!backup_suffix) default_suffix(); + get_tmp_suffix(); /* get a unique tmp_name for the tmp file */ + + init_page_reader(); + init_complaint_sender(); + + /* initialize random numbers */ + srand(time(NULL) + getpid()); + + /* set the timeout for readable() to be about 3 to 6 seconds + Actually, this setting is arbitrary. + The timeout of readable() does not play a role in + the logic flow. + */ + set_delay( 3 + rand() % 6, 0); + mode = old_mode = TEST; + + /* -----------------------The main loop--------------------------- + Multicatcher simply waits for any incoming UDP, + reads and handles it. + If the UDP contains file content, it is placed in the right place. + If the UDP contains an instruction, it is carried out. + + Multicatcher never complains unless being told so. + For example, as it is now, multicatcher does not complain + about the rate of incoming UDP being too fast to handle. + If multicatcher cannot keep up with the speed, it just + loses certain pages in a file which will be reported + later when multicaster requests acknowledgement. + ---------------------------------------------------------------- */ + while(1) { /* loop for all incoming pages */ + if (verbose>=2) + fprintf(stderr, "Starting listen loop with mode %d, old_mode = %d\n", mode, old_mode); + + /* the major task here */ + mode = read_handle_page(); + if (verbose>=2) fprintf(stderr, "new page in mode %d\n", mode); + + if (mode == ALL_DONE_CMD) break; + + /* for debugging purpose */ + if ((old_mode != mode)) { + if (verbose>=2 && mode <= 5) fprintf(stderr, "%s\n", cmd_name[mode]); + } + + /* got no data? */ + if (mode == TIMED_OUT) { + if (verbose>=2) fprintf(stderr, "*"); + } + + old_mode = mode; + } /* end of incoming page loop */ + + if (verbose>=1) fprintf(stderr, "Done!\n"); + return 0; +} diff --git a/src/page_reader.c b/src/page_reader.c new file mode 100644 index 0000000..8f349a0 --- /dev/null +++ b/src/page_reader.c @@ -0,0 +1,426 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +/* the following is needed on Sun but not on linux */ +#ifdef _SUN +#include +#endif + +extern int machineID; +extern int verbose; +extern char * my_MCAST_ADDR; /* defined in multicatcher.c */ +extern char * my_IFname; +extern int my_PORT; +extern unsigned int current_file_id; + +int isMonitor; /* flag = if this target machine is a designated monitor */ +int nPage_recv; /* counter for the number of pages received for a file */ +int machineState; /* there are four states during one file transmission */ +int isFirstPage=TRUE; /* flag */ + +/* The followings are used to determine sick condition */ +int current_missing_pages; +int last_missing_pages; +int sick_count; + +/* receive socket */ +int recfd; +#ifndef IPV6 +struct sockaddr_in rec_addr; +#else +struct sockaddr_in6 rec_addr; +#endif + +/* + Receive buffer for storing the data obtained from UDP + The format: + (5*sizeof(int) bytes header) + (PAGE_SIZE data area) + + The header has five int_type (4 bytes) int's. + (1) mode -- for master to give instructions to the target machines. + (2) current file index (starting with 1) + (3) current page index (starting with 1) + (4) bytes that has been sent in this UDP page + (5) total number of pages. + + data_ptr points to the data area. +*/ +int *mode_ptr; /* hp: change from char to int */ +int *total_pages_ptr; +int *current_page_ptr; +int *bytes_sent_ptr, *current_file_ptr; +char *data_ptr; +char rec_buf[PAGE_BUFFSIZE]; + +void init_page_reader() +{ + /*struct ip_mreq mreq;*/ + int rcv_size; + + machineState = IDLE_STATE; + isMonitor = FALSE; + + /* Prepare buffer pointers */ + mode_ptr = (int*)rec_buf; + current_file_ptr = (int *)(mode_ptr + 1); + current_page_ptr = (int *)(current_file_ptr + 1); + bytes_sent_ptr = (int *)(current_page_ptr + 1); + total_pages_ptr = (int *)(bytes_sent_ptr + 1); + data_ptr = (char *)(total_pages_ptr + 1); + + /* Set up receive socket */ + if (verbose>=2) fprintf(stderr, "setting up receive socket\n"); + recfd = rec_socket(&rec_addr, my_PORT); + + /* Join the multicast group */ + /*inet_pton(AF_INET, MCAST_ADDR, &(mreq.imr_multiaddr.s_addr)); + mreq.imr_multiaddr.s_addr = inet_addr(my_MCAST_ADDR); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + + if (setsockopt(recfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void*)&mreq, sizeof(mreq)) < 0){ + perror("Joining Multicast Group"); + } + */ + + if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) { + perror("Joining Multicast Group"); + } + + /* Increase socket receive buffer */ + rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE; + if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer for page_reader"); + } + +} + +/* + This is the heart of multicatcher. + It parses the incoming pages and do proper reactions according + to the mode (command code) encoded in the first 4 bytes in an UDP page. + It returns the mode. +*/ +int read_handle_page() +{ + #ifndef IPV6 + struct sockaddr_in return_addr; + #else + struct sockaddr_in6 return_addr; + #endif + int bytes_read; + socklen_t return_len = (socklen_t)sizeof(return_addr); + int mode_v, bytes_sent_v, total_pages_v, current_file_v, current_page_v; + + /* -----------receiving data -----------------*/ + if (readable(recfd) == 1) { /* there is data coming in */ + /* check_queue(); This might be useful. But need more study. */ + /* get data */ + bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0, + (struct sockaddr *)&return_addr, + (socklen_t*) &return_len); + + bytes_sent_v = ntohl(*bytes_sent_ptr); + if (bytes_read != (bytes_sent_v + HEAD_SIZE)) + return NULL_CMD; + + /* convert from network byte order to host byte order */ + mode_v = ntohl(*mode_ptr); + total_pages_v = ntohl(*total_pages_ptr); + current_file_v = ntohl(*current_file_ptr); + current_page_v = ntohl(*current_page_ptr); + + if (isFirstPage) { + update_complaint_address(&return_addr); + isFirstPage = FALSE; + } + + /* --- process various commands (modes) */ + switch (mode_v) { + case TEST: + /* It is just a test packet? */ + fprintf(stderr, "********** Received test packet **********\n"); + return mode_v; + + case SELECT_MONITOR_CMD: + if (current_page_v == machineID) { + isMonitor = TRUE; + send_complaint(MONITOR_OK, machineID, 0, 0); + } else { + isMonitor = FALSE; + } + return mode_v; + + case OPEN_FILE_CMD: + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) + && machineState == IDLE_STATE) { + /* get info about this file */ + if (verbose>=1) + fprintf(stderr, "open file id= %d\n", current_file_v); + if (!extract_file_info(data_ptr, current_file_v, total_pages_v)) + return mode_v; + + /* different tasks here */ + /* open file, rmdir, unlink */ + if (total_pages_v < 0) { /* delete a file or a directory */ + if (!delete_file(TRUE)) return mode_v; /* this fx can be re-entered many times */ + /* machineState remains to be IDLE_STATE */ + } else if (total_pages_v == 0) { + /* handle an empty file, or (soft)link or directory */ + if (!check_zero_page_entry()) return mode_v; /* can re-enter many times */ + /* machineState remains to be IDLE_STATE */ + } else { /* a regular file */ + if (!open_file()) return mode_v; + /* the file has been opened. */ + sick_count = 0; + current_missing_pages =0; + last_missing_pages = nPages_for_file(current_file_v); + machineState = GET_DATA_STATE; + } + send_complaint(OPEN_OK, machineID, 0, current_file_id); /* ack */ + nPage_recv = 0; + return mode_v; + } + /* + We must be in GET_DATA_STATE. + OPEN_OK ack has been sent back in the previous block. + However, + the master may not have received the ack. + In that case the master will send back the open_file_cmd again. + */ + if ((current_page_v == (int) ALL_MACHINES ||current_page_v == (int) machineID) + && (machineState == GET_DATA_STATE)) { /****/ + send_complaint(OPEN_OK, machineID, 0, current_file_id); + } + return mode_v; + + case EOF_CMD: + /**********/ + if (verbose>=1) + fprintf(stderr, "***** EOF received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + /* the following happnens when this machine was previously out-of-pace + and was labeled as 'BAD MACHINE' by the master. + The master has proceeded with the syncing process without + waiting for this machine to finish the process in one of the previous files. + Since under normal condition, this machine should not expect to see + current_file changes except when OPEN_FILE_CMD is received. + */ + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + /* normal condition */ + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) + && machineState == GET_DATA_STATE) { /* GET_DATA_STATE */ + /* check missing pages and send back missing-page-request */ + current_missing_pages = ask_for_missing_page(); /* = total # of missing_pages */ + + if (current_page_v == (int) machineID) { + /* master is asking for my EOF_ack */ + if (current_missing_pages == 0) { + /* w/o assuming how we get to this point ... */ + machineState = DATA_READY_STATE; + send_complaint(EOF_OK, machineID, 0, current_file_id); + } else { + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages , current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + } + return mode_v; + } + + /*** + master is asking everyone, (after master sent or re-sent pages) + so, we do some book-keeping procedures (incl state change) + ***/ + if (verbose >=1) + fprintf(stderr, "missing_pages = %d, nPages_received = %d file = %d\n", + current_missing_pages, nPage_recv, current_file_v); /************/ + + nPage_recv = 0; + + if (current_missing_pages == 0) { + /* + There is no missing page. + Change the state. + */ + machineState = DATA_READY_STATE; + send_complaint(EOF_OK, machineID, 0, current_file_id); + return mode_v; + } else { + /* + There are missing pages. + If we still miss many pages for SICK_THRESHOLD consecutive times, + then we are sick. e.g. machine CPU does not give multicatcher + enough time to process incoming UDP's OR the disk I/O is too slow. + */ + + if ((SICK_RATIO)*(double)last_missing_pages < (double)current_missing_pages) { + ++sick_count; + if (sick_count > SICK_THRESHOLD) { + machineState = SICK_STATE; + send_complaint(SIT_OUT, machineID, + 0, current_file_id); /* no more attempt to receive */ + /* master may send more pages from requests from other machines + but this machine will mark this file as 'sits out receiving' */ + } else { + /* not sick enough yet */ + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages, current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + /* master will send more pages */ + } + } else { /* we are getting enough missing pages this time to keep up */ + sick_count = 0; /* break the consecutiveness */ + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages, current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + /* master will send more pages */ + } + last_missing_pages = current_missing_pages; + return mode_v; + } + } /* end GET_DATA_STATE */ + + /* After state change, we still get request for ack. + send back ack again */ + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)) { + switch (machineState) { + case DATA_READY_STATE: + send_complaint(EOF_OK, machineID, + 0, current_file_id); + return mode_v; + case SICK_STATE: + send_complaint(SIT_OUT, machineID, + 0, current_file_id); /* just an ack, even for sick state*/ + return mode_v; + } + } + return mode_v; + + case CLOSE_FILE_CMD: + if (verbose>=1) + fprintf(stderr, "***** CLOSE received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) { + if (machineState == DATA_READY_STATE) { + if (!close_file()) { return mode_v; }; + set_owner_perm_times(); + machineState = IDLE_STATE; + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + return mode_v; + } else if (machineState == IDLE_STATE) { + /* send ack back again because we are asked */ + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + return mode_v; + } else { /* other states -- we should not be here*/ + /* if (machineState == SICK_STATE || machineState == GET_DATA_STATE) */ + /* SICK_STATE --> we are too slow in getting missing pages + if one of the machines is sick, master will send out CLOSE_ABORT + GET_DATA_STATE --> + We are not supposed to be in GET_DATA_STATE, + so consider it a sick_state */ + fprintf(stderr, "*** should not be here -- state=%d\n", machineState); + if (!rm_tmp_file()) { return mode_v; }; + machineState = IDLE_STATE; + send_complaint(SIT_OUT, machineID, 0, current_file_id); + /* make sick_count larger than threshold for GET_DATA_STATE */ + sick_count = SICK_THRESHOLD + 10000; + return mode_v; + } + } + return mode_v; + + case CLOSE_ABORT_CMD: + if (verbose>=1) + fprintf(stderr, "***** CLOSE_ABORT received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) { + if (!rm_tmp_file()) { return mode_v; }; + machineState = IDLE_STATE; + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + } + return mode_v; + + case SENDING_DATA: + case RESENDING_DATA: + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + /* + otherwise, go ahead... + */ + if (verbose>=2) { + fprintf(stderr, "Got %d bytes from page %d of %d for file %d mode=%d\n", + bytes_read - HEAD_SIZE, + current_page_v, total_pages_v, + current_file_v + 1, mode_v); + } + + /* timing the disk IO */ + /* start_timer(); */ + + write_page(current_page_v, data_ptr, bytes_read - HEAD_SIZE); + if (isMonitor) send_complaint(PAGE_RECV, machineID, 0, current_file_id); + + /* + end_timer(); + update_time_accumulator(); + */ + + /* Yes, we have just read a page */ + ++nPage_recv; + return mode_v; + + case ALL_DONE_CMD: + /* + clear up the files. + */ + /*** since we do not know if there are other machines + that are NOT in data_ready_state, + to maintain equality, it is best to just + remove tmp_file without close_file() ***/ + rm_tmp_file(); + return mode_v; + + default: + return mode_v; + } /* end of switch */ + } else { + /* No, the read is timed out */ + return TIMED_OUT; + } +} + diff --git a/src/parse_synclist.c b/src/parse_synclist.c new file mode 100644 index 0000000..0c2a7aa --- /dev/null +++ b/src/parse_synclist.c @@ -0,0 +1,320 @@ +/* + Copyright (C) 2006-2008 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#include /* to define PATH_MAX */ + +/* + Get the next to-be-synced file from synclist which is the output of + parseRsyncList.C + The latter in turn parses the output from rsync. + In other words, we use rsync in dry-run mode to get the + files that need to be synced. + + to port to large_file environment: use off_t for size +*/ + +extern int verbose; + +char basedir[PATH_MAX]; /* baseDir for the syncing */ +FILE *fd; +struct stat st; /* stat for the current file */ + +unsigned int nEntries; +int cur_entry; /* file id -- <0 if it needs backup */ +unsigned int nPages; +unsigned int last_bytes; /* number of bytes for the last page */ +int toRmFirst = FALSE; /* flag rm existing file and then sync */ +int file_changed = FALSE; /* flag to indicate if file has been changed during syncing */ + +unsigned int total_pages; +off_t total_bytes; + +int isDelete, isHardlink; +char filename[PATH_MAX]; +char fullname[PATH_MAX]; +char linktar[PATH_MAX]; + +int init_synclist(char * synclist_path, char *bdir) +{ + char line[PATH_MAX]; + nEntries = 0; + strcpy(basedir, bdir); + + if ((fd = fopen(synclist_path, "r")) == NULL) { + fprintf(stderr, "Cannot open synclist file = %s \n", synclist_path); + return FAIL; + } + + while (fgets(line, PATH_MAX, fd) != NULL) nEntries++; + if (nEntries == 0) { + fclose(fd); + fprintf(stderr, "Empty entires in synclist file = %s\n", synclist_path); + return SUCCESS; /* OK, nothing to sync */ + } + rewind(fd); + + cur_entry = 0; + total_pages = 0; + total_bytes = 0; + + return SUCCESS; +} + +/* + pages_for_file calculates the number (int) of pages for the current file + and returns that number in an (int) type. + So, max_pages = 2**31 = 2147483648 + which corresponds to a file_size of 2**31 * 64512 = 1.38e14 + [ general limit = (1<<(sizeof(int)*8)) * PAGE_SIZE ] + At that time, the type of page_number needs to be upgraded :) + + to_delete -> -1 + normal_file -> number_of_pages + softlink -> 0 + hardlink -> 0 + directory -> 0 +*/ +int pages_for_file() +{ + if (isDelete) { /* to be deleted directory or file */ + return TO_DELETE; + } + + if (S_ISREG(st.st_mode)){ + int n; + if (st.st_nlink > 1) return 0; /* hardlink file */ + + n = (int)((st.st_size)/(off_t)PAGE_SIZE); /* regular file */ + if ((st.st_size)%((off_t)PAGE_SIZE) == 0) { + last_bytes = (unsigned int)(PAGE_SIZE); + return n; + } else { + last_bytes = (unsigned int)(st.st_size - (off_t)n * (off_t)PAGE_SIZE); + return n+1; + } + /*return ((st.st_size)%((off_t)PAGE_SIZE) == 0) ? n : n+1 ;*/ + } + if (S_ISLNK(st.st_mode)){ + return 0; /* softlink */ + } + return 0; /* directory */ +} + +off_t bytes_for_file() +{ + return st.st_size; +} + +unsigned int get_nPages() /* for this file */ +{ + return nPages; +} + +void strip(char * str) +{ + /* remove trailing \n and spaces */ + char *pc = &str[strlen(str)-1]; + while (*pc == ' ' || *pc == '\n') { + *(pc--) = '\0'; + } +} + +int same_stat_for_file() +{ + /* check if current stat is same as that when get_next_entry is called */ + struct stat st1; + + if(lstat(fullname, &st1) < 0) { + if (verbose >=1) perror(fullname); + return FAIL; + } + + if (st1.st_size != st.st_size || st1.st_mode != st.st_mode || + st1.st_mtime != st.st_mtime) { + return FAIL; /* the file has changed */ + } + return SUCCESS; + +} + +int is_hardlink_line(char * line) +{ + /* when line is in the form of + string1 string2 + it can be either a filename (string1 string2) + or a hardlink string1 => string2 + */ + struct stat st; + char fn[PATH_MAX]; + strcpy(fn, basedir); + strcat(fn, "/"); + strcat(fn, line); + + /* if the whole line is not a file, then we are dealing with hardlink case */ + return (lstat(fn, &st) < 0); + /* cannot deal with the situation + str1 is a file + str2 is a hardlink to str1 + str1 str2 is a file + AND if we need to sync + str1 and 'str1 str2' at the same time. + But this situation is very rare. */ +} + +int get_next_entry(int current_file_id) +{ + char *c; + char line[PATH_MAX]; + + /* inside this function, cur_entry is set to be positve to facilitate processing */ + cur_entry = current_file_id; /* from main loop's index, starting with 1 */ + + isDelete = FALSE; + isHardlink = FALSE; + + fgets(line, PATH_MAX, fd); + strip(line); + + if (current_file_id == nEntries) { + fclose(fd); /* close the synclist file */ + if (verbose>=2) fprintf(stderr, "no more entry in synclist.\n"); + } + + if (verbose>=2) { + fprintf(stderr, "Got current entry = %d (total= %d)\n", cur_entry, nEntries); + fprintf(stderr, "%s\n", line); + } + + strcpy(fullname, basedir); + strcat(fullname, "/"); + if (strncmp(line, "deleting ", 9)==0) { + isDelete = TRUE; + nPages = -1; + strcat(fullname, &line[9]); + strcpy(filename, &line[9]); + if (needBackup(fullname)) cur_entry = -cur_entry; + return SUCCESS; + } else if ((c = strchr(line, ' '))!=NULL && is_hardlink_line(line)) { + /* is it a hardlink -- two filenames separated by a space */ + char fn[PATH_MAX]; + isHardlink = TRUE; + strncpy(fn, line, (c - line)); + fn[c-line] = '\0'; + strcat(fullname, fn); + strcpy(filename, fn); + strcpy(linktar, c+1); + } else { + /* normal, single entry */ + strcat(fullname, line); + strcpy(filename, line); + } + + /* update stat */ + if(lstat(fullname, &st) < 0) { + if (verbose >=1) perror(fullname); + return FAIL; + } + + if (S_ISLNK(st.st_mode)) { + int linklen; + linklen = readlink(fullname, linktar, PATH_MAX); + /* readlink doesn't null-terminate the string */ + *(linktar + linklen) = '\0'; + } else if (st.st_nlink>1 && !isHardlink) { + /* this is the target file that others (hard)link to. + treat it like a normal file */ + st.st_nlink = 1; + } + + nPages = pages_for_file(); + if (nPages > 0) { /* for regular files */ + total_pages += nPages; + total_bytes += st.st_size; + } + + if (needBackup(fullname)) cur_entry = -cur_entry; + + file_changed = FALSE; + + return SUCCESS; +} + +void adjust_totals() +{ + if (nPages > 0) { + total_pages -= nPages; + total_bytes -= st.st_size; + } +} + +/* some accessors */ +unsigned int total_entries() { return nEntries; } + +int current_entry() { return cur_entry; } + +int is_softlink() { return S_ISLNK(st.st_mode); } + +int is_hardlink() { return isHardlink; } + +int is_directory() { return S_ISDIR(st.st_mode); } + +char * getFilename() { /* relative to basedir */ return &filename[0]; } + +char * getFullname() { return &fullname[0]; } + +/* + The following three fx are used to fill file_info into the send_buffer. + They return the number of bytes being written into the buf, including the \0 byte. +*/ +unsigned int fill_in_stat(char *buf) +{ + /* load into buf area the stat info in ascii format */ + if (isDelete) + sprintf(buf, "0 0 0 0 0 0 0 0"); + else { + #ifdef _LARGEFILE_SOURCE + sprintf(buf, "%u %u %u %u %llu %lu %lu %d", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, + toRmFirst); + #else + sprintf(buf, "%u %u %u %u %lu %lu %lu %d", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, + toRmFirst); + #endif + } + + return strlen(buf)+1; +} + +unsigned int fill_in_filename(char * buf) +{ + strcpy(buf, filename); + return strlen(buf)+1; +} + +unsigned int fill_in_linktar(char *buf) +{ + strcpy(buf, linktar); + return strlen(buf)+1; +} + + diff --git a/src/proto.h b/src/proto.h new file mode 100644 index 0000000..6569d44 --- /dev/null +++ b/src/proto.h @@ -0,0 +1,182 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_proto_h +#define __main_proto_h + +/* parse_synclist.c */ +unsigned int total_entries(); +unsigned int fill_in_stat(char *buf); +unsigned int fill_in_linktar(char *buf); +unsigned int fill_in_filename(char * buf); +unsigned int get_nPages(); +int pages_for_file(); +char * getFilename(); +char * getFullname(); +int same_stat_for_file(); +void strip(char * str); +int current_entry(); +int get_next_entry(int current_file_id); +int is_softlink(); +int is_directory(); +int is_hardlink(); +int init_synclist(char * synclist_path, char *bdir); +void adjust_totals(); + +/* backup.c */ +int read_backup_pattern(char * fpat_file); +int needBackup(char * filename); + +/* sends.c */ +void init_sends(); +void set_mode(int new_mode); +int send_page(int page); +void send_test(); +void send_cmd(int code, int machine_id); +void send_all_done_cmd(); +int fexist(int entry) ; +void pack_open_file_info(); +void my_exit(int); + +/* complaints.c */ +void init_complaints(); +int read_handle_complaint(int cmd); +void wait_for_ok(int code); +void refresh_machine_status(); +void refresh_missing_pages(); +void mod_machine_status(); +void refresh_file_received(); +int nNotRecv(); +int iNotRecv(); +int is_it_missing(int page); +int has_missing_pages(); +int has_sick_machines(); +void init_missing_page_flag(int n); +void free_missing_page_flag(); +void refresh_machine_status(); +void init_machine_status(int n); +void page_sent(int page); +int nBadMachines(); +void do_badMachines_exit(); +int pr_missing_pages(); +int send_done_and_pr_msgs(double, double); +void do_cntl_c(int signo); +void set_has_missing(); +void reset_has_missing(); +void set_has_sick(); +void reset_has_sick(); + + +/* setup_socket.c */ +void set_delay(int secs, int usecs); +void get_delay(int * secs, int * usecs); +int readable(int fd); +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port); +int send_socket(struct sockaddr_in *addr, char * cp, int port); +int rec_socket(struct sockaddr_in *addr, int port); +#else +int rec_socket(struct sockaddr_in6 *addr, int port); +int send_socket(struct sockaddr_in6 *addr, char * cp, int port); +int complaint_socket(struct sockaddr_in6 *addr, int port); +#endif + +/* set_mcast.c */ +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex); +int mcast_set_loop(int sockfd, int onoff); +int mcast_set_ttl(int sockfd, int val); + +/* set_catcher_mcast.c */ +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex); +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr); + +/* complaint_sender.c */ +void fill_in_int(int i); +void init_fill_ptr(); +void send_complaint(int complaint, int mid, int page, int file); +void init_complaint_sender(); +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa); +#else +void update_complaint_address(struct sockaddr_in6 *sa); +#endif + +/* page_reader.c */ +void init_page_reader(); +int check_queue(); +int read_handle_page(); + +/* file_operations.c */ +void get_tmp_suffix(); +int extract_file_info(char * buf, int n_file, unsigned int n_pages); +int open_file(); +int close_file(); +int rm_tmp_file(); +int delete_file(int to_check_dir_type); +int touch_file(); +int nPages_for_file(); +int has_all_pages(); +int ask_for_missing_page(); +void missing_page_stat(); +void write_page(int page, char* data_ptr, int bytes); +int is_missing(int page); +void page_received(int page); +int set_owner_perm_times(); +void close_last_file(); +int check_zero_page_entry(); +void default_suffix(); + +/* timing */ +void refresh_timer(); +double get_accumulated_time(); +void start_timer(); +void end_timer(); +void update_time_accumulator(); +double get_accumulated_usec(); +void update_rtt_hist(unsigned int rtt); +void pr_rtt_hist(); +void init_rtt_hist(); +unsigned int pages_wo_ack(); + +/* signal.c */ +typedef void Sigfunc(int); /* for signal handlers */ +Sigfunc * Signal(int signo, Sigfunc *func); +int Fcntl(int fd, int cmd, int arg); +int Ioctl(int fd, int request, void *arg); +void Sigemptyset(sigset_t *set); +void Sigaddset(sigset_t *set, int signo); +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset); + +/* id_map.c */ +void get_machine_names(char * filename); +char * id2name(int id); + +#endif diff --git a/src/rtt.c b/src/rtt.c new file mode 100644 index 0000000..71c8bd0 --- /dev/null +++ b/src/rtt.c @@ -0,0 +1,258 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +/* + To measure round trip time (RTT) using UDP +*/ + +#include "rttmain.h" +#include /* to define PATH_MAX */ +#include + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +int my_TTL = MCAST_TTL; +int my_LOOP = MCAST_LOOP; +char * my_IFname = MCAST_IF; + +int no_feedback_count; +char * machine = NULL; +int remote_pid; +char * reshell = REMOTE_SHELL; +char catcher_path[PATH_MAX]; + +void usage() +{ + fprintf(stderr, + "rtt (to measure rount trip time to a target version %s\n" + " Option list:\n" + " [ -v flag to turn on verbose]\n" + " [ -r ]\n" + " [ -p ]\n" + " -------- essential options ----------------------------------\n" + " -m \n" + " -n \n" + " -s \n" + " -------- mcast options --------------------------------------\n" + " [ -A ]\n" + " [ -P ]\n" + " [ -T ]\n" + " [ -L flag turn on mcast_LOOP. default is off ]\n" + " [ -I ]\n", + VERSION, reshell, catcher_path, my_MCAST_ADDR, my_PORT, my_TTL); +} + +void get_dirname_of_rtt(char *dname, char *rtt_path) +{ + char path[PATH_MAX]; + strcpy(path, rtt_path); /* dirname() will change its argument */ + strcpy(dname, dirname(path)); + + if (strcmp(dname, ".")==0) { + strcpy(dname, getcwd(path, PATH_MAX)); + } +} + +void do_one_page(int page) +{ + unsigned long rtt; + refresh_timer(); + start_timer(); + send_page(page); + /* read_handle_complaint() waits n*interpage_interval at most */ + if (read_handle_complaint()==0) { /* delay_sec for readable() is set by set_delay() */ + /* + At this point, the readable() returns without getting a reply + from the target after n*DT_PERPAGE + This indicates that the page has likely been lost in the network. + */ + if (verbose) fprintf(stderr, "no ack for page = %d\n", page); + ++no_feedback_count; + update_rtt_hist(999999); /* register this as rtt = infinite --- the last element in rtt_hist */ + if (no_feedback_count>NO_FEEDBACK_COUNT_MAX) { /* count the consecutive no_feedback event */ + /* switch to another client */ + fprintf(stderr, "Consecutive non_feedback exceeds limit. Continue with next page...\n"); + no_feedback_count = 0; + } + } else { + end_timer(); + update_time_accumulator(); + rtt = get_accumulated_usec(); + /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ + /* to do: update histogram */ + if (verbose>=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); + update_rtt_hist(rtt); + + no_feedback_count = 0; + } +} + +int invoke_catcher(char * machine) +{ + FILE *ptr; + char buf[PATH_MAX]; + + /* invoke rttcatcher on remote machine */ + fprintf(stderr, "using %s to invoke rttcatcher on %s\n", reshell, machine); + + /* check if rsh (ssh) works */ + sprintf(buf, "%s %s date", reshell, machine); + if (verbose) fprintf(stderr, "%s\n", buf); + if (system(buf)) { + fprintf(stderr, "cannot do rsh to the target machine = %s\n", machine); + exit(BAD_EXIT); + } + + if (!my_IFname) { + sprintf(buf, + "%s %s '%s/rttcatcher -A %s -P %d < /dev/null 1>/dev/null 2>/dev/null & echo $!'", + reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT); + } else { + sprintf(buf, + "%s %s '%s/rttcatcher -A %s -P %d -I %s < /dev/null 1>/dev/null 2>/dev/null & echo $!'", + reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT, my_IFname); + } + + + fprintf(stderr, "%s\n", buf); + if ((ptr = popen(buf, "r")) == NULL) { + fprintf(stderr, "Failure to invoke rttcather\n"); + exit(-1); + } + fgets(buf, PATH_MAX, ptr); + pclose(ptr); + + return atoi(buf); +} + +int main(int argc, char *argv[]) +{ + int c; + int nPages =-1, pageSize=-1, ipage; + + verbose = 0; + catcher_path[0] = '\0'; + + while ((c = getopt(argc, argv, "vm:s:n:r:p:A:P:T:LI:")) != EOF) { + switch (c) { + case 'v': + verbose = 1; + break; + case 'r': + reshell = optarg; + break; + case 'p': + strcpy(catcher_path, optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'T': + my_TTL = atoi(optarg); + break; + case 'L': + my_LOOP = TRUE; + break; + case 'I': + my_IFname = optarg; + break; + case 'n' : + nPages = atoi(optarg); + break; + case 'm': + machine = optarg; + break; + case 's': + pageSize = atoi(optarg); + if (pageSize>MAX_PAGE_SIZE) { + usage(); + exit(-1); + } + break; + case '?': + usage(); + exit(-1); + } + } + + if (strlen(catcher_path)==0) { + get_dirname_of_rtt(catcher_path, argv[0]); + } + + + if (nPages == -1 || pageSize == -1 || machine == NULL) { + fprintf(stderr, "Essential options (-n -m -s) should be specified. \n"); + usage(); + exit(-1); + } + + /* init */ + init_sends(pageSize); + init_complaints(); + + /* set up Cntl_C catcher */ + Signal(SIGINT, do_cntl_c); + + remote_pid = invoke_catcher(machine); + fprintf(stderr, "remote pid = %d\n", remote_pid); + + sleep(1); + + /* -------------------Send data-------------------------------------- */ + + init_missing_page_flag(nPages); + + send_cmd(START_CMD, nPages); + refresh_machine_status(); + wait_for_ok(START_CMD); + do_badMachines_exit(machine, remote_pid); + + fprintf(stderr, "Sending data...\n"); + /* send pages */ + set_mode(SENDING_DATA); + set_delay(0, DT_PERPAGE*FACTOR); + + no_feedback_count = 0; + for (ipage = 0; ipage < nPages; ipage++){ + do_one_page(ipage); + } + + send_cmd(EOF_CMD, 0); + refresh_machine_status(); + wait_for_ok(EOF_CMD); + do_badMachines_exit("", -1); + + /* -----------------end of send data -------------------------------- */ + + free_missing_page_flag(); + send_done_and_pr_msgs(); + return 0; +} + diff --git a/src/rttcatcher.c b/src/rttcatcher.c new file mode 100644 index 0000000..1af74f1 --- /dev/null +++ b/src/rttcatcher.c @@ -0,0 +1,118 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + Codes in this file are extracted and modified from multicatcher.c. + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +char * my_IFname = MCAST_IF; + +void usage() +{ + fprintf(stderr, + "rttcatcher (to receive pages on the target. version %s\n" + " Option list:\n" + " [ -v flag to turn on verbose]\n" + " -------- mcast options --------------------------------------\n" + " [ -A ]\n" + " [ -P ]\n" + " [ -I ]\n", + VERSION, MCAST_ADDR, PORT); +} + +int main(int argc, char *argv[]) +{ + int old_mode; /* hp: from char to int for mode */ + int mode; + int c; + + verbose = 0; + while ((c = getopt(argc, argv, "vA:P:I:")) != EOF) { + switch (c) { + case 'v': + verbose = 1; + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'I': + my_IFname = optarg; + break; + case '?': + usage(); + exit(-1); + } + } + + init_page_reader(); + init_complaint_sender(); + + /* initialize random numbers */ + srand(time(NULL) + getpid()); + + /* Wait forever if necessary for first packet */ + set_delay(0, -1); + mode = old_mode = TEST; /* hp: add mode */ + + while(1) { /* loop for all incoming pages */ + if (verbose) + fprintf(stderr, "Starting listen loop with mode %d\n", mode); + + mode = read_handle_page(); + if (verbose) fprintf(stderr, "in mode %d\n", mode); + + if (mode == ALL_DONE_CMD) break; + + /* got no data? */ + if (mode == TIMED_OUT) { + if (verbose) fprintf(stderr, "*"); + continue; + } /* end if TIMED_OUT */ + + /* changing modes? */ + if ((old_mode != SENDING_DATA) && (mode == SENDING_DATA)){ + /* Taking data, wait at least 3 to 8 seconds */ + set_delay( 3 + rand() % 8, 200); + if (verbose) fprintf(stderr, "Receiving data\n"); + old_mode = mode; + continue; + } + + /* all other modes */ + old_mode = mode; + + } /* end of incoming page loop */ + + if (verbose) fprintf(stderr, "Done!\n"); + return 0; +} + + diff --git a/src/rttcomplaint_sender.c b/src/rttcomplaint_sender.c new file mode 100644 index 0000000..20d7cda --- /dev/null +++ b/src/rttcomplaint_sender.c @@ -0,0 +1,103 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + Codes in this file are extracted and modified from complaint_sender.c + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +/* send socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; + +/* send buffer */ +char complaint_buffer[FLOW_BUFFSIZE]; +int *ccode_ptr; /* change from char to int -- mem alignment */ +int *cpage_ptr; + +/*---------------------------------------------------------- + init_complaint_sender initializes the buffer to allow the + catcher to send complaints back to the sender. + + ret_address of sender to whom we will complain + is determined when we receive the first UDP data + in read_handle_page() in page_reader.c + ----------------------------------------------------------*/ +void init_complaint_sender() +{ + if (verbose) + fprintf(stderr, "in init_complaint_sender\n"); + + /* init the send_socket */ + complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT); + + ccode_ptr = (int *) complaint_buffer; + cpage_ptr = (int *)(ccode_ptr + 1); +} + +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin_addr); +} +#else +void update_complaint_address(struct sockaddr_in6 *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin6_addr); +} +#endif + +/*------------------------------------------------------------------------ + send_complaint fills the complaint buffer and send it through our socket + back to the sender + + The major use is to tell master machine which page of which file + needs to be re-transmitted. + complaint -- the complain code defined in main.h + file -- the file index + page -- page index + ------------------------------------------------------------------------*/ +void send_complaint(int complaint, int page) +{ + /* fill in the complaint data */ + /* 20060323 add converting to network byte-order before sending out */ + *ccode_ptr = htonl(complaint); + *cpage_ptr = htonl(page); + + /* send it */ + if( sendto(complaint_fd, complaint_buffer, FLOW_BUFFSIZE, 0, + (const struct sockaddr *)&complaint_addr, + sizeof(complaint_addr)) < 0) { + perror("Sending complaint\n"); + } + if (verbose) + printf("Sent complaint:code=%d page=%d\n", complaint, page); +} diff --git a/src/rttcomplaints.c b/src/rttcomplaints.c new file mode 100644 index 0000000..4f9ed9b --- /dev/null +++ b/src/rttcomplaints.c @@ -0,0 +1,270 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + codes in this file are extracted and modified from complaints.c + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" +#include + +/* buffer for receiving complaints */ + +char flow_buff[FLOW_BUFFSIZE]; +int *code_ptr; /* What's wrong? */ +int *page_ptr; /* Which page */ + +/* receive socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; + +/* status */ +char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */ +int total_missing_page = 0; +char machine_status = NOT_READY; +int nMachines = 1; +int nPages; +char *machine_list_file; + +extern char * machine; +extern int remote_pid; +extern char * reshell; + +/* + init_complaints initializes our buffers to receive complaint information + from the catchers +*/ +void init_complaints () +{ + if (verbose) + fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE); + + /* Buffer */ + code_ptr = (int *)flow_buff; + page_ptr = (int *)(code_ptr + 1); + + /* Receive socket */ + if (verbose) printf("set up receive socket for complaints\n"); + complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT); +} + +void init_missing_page_flag(int n) +{ + int i; + nPages = n; + if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) { + fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n); + perror("error = "); + exit(-1); + } + for(i=0; i nPages) return 1; /* *page_ptr = page # (1 origin) */ + ++(total_missing_page); + missing_page_flag[(page_v)-1] = MISSING; + return 1; + case LAST_MISSING : + if (page_v > nPages) return 1; + ++(total_missing_page); + missing_page_flag[page_v-1] = MISSING; + machine_status = MACHINE_OK; + return 1; + default : + printf("Unknown complaint: %d\n", code_v); + return 0; + } /* end of switch */ + } /* end of if(readable) */ + + return 0; +} + +int all_machine_ok() +{ + return (machine_status == NOT_READY ) ? 0 : 1; +} + +void wait_for_ok(int code) +{ + int i, count; + time_t tloc; + time_t rtime0, rtime1; + + rtime0 = time(&tloc); /* reference time */ + + count = 0; + while (!all_machine_ok()) { + if (read_handle_complaint()==1) { /* if there is a complaint handled */ + rtime0 = time(&tloc); /* reset the reference time */ + continue; + } + /* no complaints handled */ + rtime1 = time(&tloc); /* time since last complaints */ + if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) { + ++count; + if (count < ACK_WAIT_TIMES) { + fprintf(stderr, "%d: resend cmd(%d) to machines:[ ", count, code); + for(i=0; i 0) { + kill_pid(); + } + + exit(-1); +} diff --git a/src/rttmain.h b/src/rttmain.h new file mode 100644 index 0000000..78eb693 --- /dev/null +++ b/src/rttmain.h @@ -0,0 +1,126 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_h +#define __main_h + +#include +#include +#include +#include +#include +#include +#include +#include /* sockaddr_in{} and other Internet defns */ +#include /* inet(3) functions */ +#include +#include /* for nonblocking */ +#include +#include +#include +#include +#include +#include /* for S_xxx file mode constants */ +#include /* for iovec{} and readv/writev */ +#include +#include +#include /* timeval{} for select() */ + +#define VERSION "3.1.0" + +/* logic values */ +#define FALSE 0 +#define TRUE 1 +#define FAIL (FALSE) +#define SUCCESS (TRUE) +#define GOOD_EXIT 0 +#define BAD_EXIT -1 + +/* Ports and addresses */ +#define PORT 7900 /* for multicast */ +#define FLOW_PORT (PORT-1) /* for flow-control */ +#define MCAST_ADDR "239.255.67.200" +#define MCAST_TTL 1 +#define MCAST_LOOP FALSE +#define MCAST_IF NULL + +#define REMOTE_SHELL "rsh" + +#define NO_FEEDBACK_COUNT_MAX 5 +#define USEC_TO_IDLE 1000000 + +/* Speed stuff */ +#define FAST 100 /* usec */ +#define DT_PERPAGE 8000 /* usec time interval between pages */ +#define FACTOR 50 + +/* time for the master to wait for the acknowledgement */ +#define ACK_WAIT_PERIOD 1 /* secs (from time()); */ +#define ACK_WAIT_TIMES 60 /* wait for this many periods */ + +/* complaints */ +#define TOO_FAST 100 +#define SEND_AGAIN 200 +#define START_OK 300 +#define MISSING_PAGE 500 +#define LAST_MISSING 600 +#define EOF_OK 700 +#define PAGE_RECV 800 + +#define FLOW_BUFFSIZE (2 * sizeof(int)) + +#define PAGE_SIZE 64512 /* max page_size allowed */ +#define HEAD_SIZE (3 * sizeof(int)) +#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE) +#define TOTAL_REC_PAGE 20 /* 31 20 */ + +/* Modes */ +#define TIMED_OUT 0 +#define TEST 1 +#define SENDING_DATA 2 +#define RESENDING_DATA 3 +#define START_CMD 4 +#define EOF_CMD 5 +#define ALL_DONE_CMD 6 +#define NULL_CMD 7 + +/* machine status */ +#define MACHINE_OK '\1' +#define NOT_READY '\0' + +/* PAGE STATUS */ +#define MISSING '\0' +#define RECEIVED '\1' + +/* MACHINE STATE */ +#define IDLE_STATE 0 +#define GET_DATA_STATE 1 +#define DATA_READY_STATE 2 + +#define MAX_PAGE_SIZE 64512 + +int verbose; + +#include "rttproto.h" + +#endif diff --git a/src/rttmissings.c b/src/rttmissings.c new file mode 100644 index 0000000..774e8b2 --- /dev/null +++ b/src/rttmissings.c @@ -0,0 +1,93 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +char * missingPages = NULL; /* array of flags */ +int nPages; + +int init_missingPages(int n) +{ + int i; + + nPages = n; + + /* init missingPages flags */ + if (missingPages != NULL) free(missingPages); /* for second round */ + missingPages = malloc(sizeof(char) * nPages); + for(i=0; i < nPages; ++i) + missingPages[i] = MISSING; + + return 0; +} + +int get_total_pages() +{ + return nPages; +} + +int missing_pages() +{ + int result; + int i; + + result = 0; + + for(i=0; i < nPages; ++i) + if ((missingPages[i]) == MISSING) ++result; + return result; +} + +int is_missing(int page) +{ + return (missingPages[page] == MISSING) ? 1 : 0; +} + +void page_received(int page) +{ + missingPages[page] = RECEIVED; +} + +int ask_for_missing_page() +{ + int i; + int n, count; + + n = missing_pages(); + if (n == 0) { + /* send_complaint(EOF_OK, machineID, 0); */ + return 0; /* nothing is missing */ + } + + count = 0; + for(i=0; i < nPages; ++i) { + if ( missingPages[i] == MISSING ) { + ++count; + send_complaint((count==n) ? LAST_MISSING : MISSING_PAGE, i+1); + usleep(DT_PERPAGE); + } + } + + return 1; /* there is something missing */ +} + diff --git a/src/rttpage_reader.c b/src/rttpage_reader.c new file mode 100644 index 0000000..35940e5 --- /dev/null +++ b/src/rttpage_reader.c @@ -0,0 +1,188 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + Codes in this file are extraced and modified from page_reader.c + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +/* the following is needed on Sun but not on linux */ +#ifdef _SUN +#include +#endif + +extern char * my_MCAST_ADDR; /* defined in rttcatcher.c */ +extern int my_PORT; +extern char * my_IFname; +int machineState; +int isFirstPage = TRUE; + +int recfd; +#ifndef IPV6 +struct sockaddr_in rec_addr; +#else +struct sockaddr_in6 rec_addr; +#endif + +int *mode_ptr; /* change from char to int */ +int *total_bytes_ptr; +int *current_page_ptr; +char *data_ptr; +char rec_buf[PAGE_BUFFSIZE]; + +void init_page_reader() +{ + struct ip_mreq mreq; + int rcv_size; + + machineState = IDLE_STATE; + + /* Prepare buffer */ + mode_ptr = (int*)rec_buf; /* hp: add the cast (int *) */ + current_page_ptr = (int *)(mode_ptr + 1); + total_bytes_ptr = (int *)(current_page_ptr + 1); + data_ptr = (char *)(total_bytes_ptr + 1); + + /* Set up receive socket */ + if (verbose) fprintf(stderr, "setting up receive socket\n"); + recfd = rec_socket(&rec_addr, my_PORT); + + /* Join the multicast group */ + if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) { + perror("Joining Multicast Group"); + } + + /* Increase socket receive buffer */ + rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE; + if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer"); + } +} + + +/* + This is the heart of catcher. + It parses the incoming pages and do proper reactions according + to the mode (command code) encoded in the first 4 bytes in an UDP page. + It returns the mode. + + Note: since rtt is intended to deal with one-to-one machine, + the four-state engine as in page_reader is not used. +*/ +int read_handle_page() +{ + #ifndef IPV6 + struct sockaddr_in return_addr; + #else + struct sockaddr_in6 return_addr; + #endif + + int bytes_read; + socklen_t return_len = (socklen_t)sizeof(return_addr); + int mode_v, total_bytes_v, current_page_v; + + /* -----------receiving data -----------------*/ + if (readable(recfd) == 1) { /* there is data coming in */ + /* get data */ + bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0, + (struct sockaddr *)&return_addr, + (socklen_t*) &return_len); + + total_bytes_v = ntohl(*total_bytes_ptr); + if (bytes_read != total_bytes_v) return NULL_CMD; + + /* convert from network byte order to host byte order */ + mode_v = ntohl(*mode_ptr); + current_page_v = ntohl(*current_page_ptr); + + if (isFirstPage) { + update_complaint_address(&return_addr); + isFirstPage = FALSE; + } + + /* get init wish list and return address first time only + if (firstTime){ + if (verbose) + fprintf(stderr, "Initializing complaint_sender and wish_list\n"); + init_complaint_sender(&return_addr); + firstTime = 0; + } + */ + + /* --- process various commands */ + switch (mode_v) { + case TEST: + /* It is just a test packet? */ + fprintf(stderr, "********** Received test packet **********\n"); + return mode_v; + + case START_CMD: + if (verbose) + fprintf(stderr, "Start cmd received ---\n"); + init_missingPages(current_page_v); /* Here: use current_page for total_pages */ + send_complaint(START_OK, 0); + return mode_v; + + case EOF_CMD: + if (verbose) + fprintf(stderr, "Check and ask for missing pages ---\n"); + + if (ask_for_missing_page()==0) { + /* + There is no missing page. + */ + send_complaint(EOF_OK, 0); + } + /* + else + There are missing pages. + Ack has been done in ask_for_missing_page() + */ + + return mode_v; + + case SENDING_DATA: + case RESENDING_DATA: + if (verbose){ + fprintf(stderr, "Got %d bytes from page %d of %d, mode=%d\n", + bytes_read - HEAD_SIZE, + current_page_v, get_total_pages(), mode_v); + } + + /* mode == SENDING_DATA, RESENDING_DATA */ + page_received(current_page_v); + send_complaint(PAGE_RECV, 0); + + /* Yes, we read a page */ + return mode_v; + case ALL_DONE_CMD: /* this is presumably a good machine */ + default: + return mode_v; + } /* end of switch */ + } else { + /* No, the read timed out */ + return TIMED_OUT; + } +} + + diff --git a/src/rttproto.h b/src/rttproto.h new file mode 100644 index 0000000..6d4cf62 --- /dev/null +++ b/src/rttproto.h @@ -0,0 +1,116 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __rttproto_h +#define __rttproto_h + +/* rttsends */ +void init_sends(int n); +void set_mode(int new_mode); +int send_page(int page); +void send_cmd(int code, int machine_id); +void send_all_done_cmd(); + +/* rttcomplaints */ +void init_complaints(); +int read_handle_complaint(); +void wait_for_ok(int code); +void refresh_machine_status(); +int is_it_missing(int page); +int has_missing_pages(); +void init_missing_page_flag(int n); +void free_missing_page_flag(); +void refresh_machine_status(); +int get_total_missing_pages(); +void page_sent(int page); +void pr_missing_pages(); +void do_cntl_c(int signo); +void send_done_and_pr_msgs(); +void do_badMachines_exit(char * machine, int pid); + +/* setup_socket.c */ +void set_delay(int secs, int usecs); +int readable(int fd); +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port); +int send_socket(struct sockaddr_in *addr, char * cp, int port); +int rec_socket(struct sockaddr_in *addr, int port); +#else +int rec_socket(struct sockaddr_in6 *addr, int port); +int send_socket(struct sockaddr_in6 *addr, char * cp, int port); +int complaint_socket(struct sockaddr_in6 *addr, int port); +#endif + +/* set_mcast.c */ +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex); +int mcast_set_loop(int sockfd, int onoff); +int mcast_set_ttl(int sockfd, int val); + +/* set_catcher_mcast.c */ +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex); +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr); + +/* rttcomplaint_sender */ +void send_complaint(int complaint, int page); +void init_complaint_sender(); +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa); +#else +void update_complaint_address(struct sockaddr_in6 *sa); +#endif + +/* rttpage_reader */ +void init_page_reader(); +int read_handle_page(); + +/* rttmissings */ +int init_missingPages(int n); +int missing_pages(); +int is_missing(int page); +void page_received(int page); +int ask_for_missing_page(); +int get_total_pages(); + +/* timing */ +void refresh_timer(); +double get_accumulated_time(); +void start_timer(); +void end_timer(); +void update_time_accumulator(); +double get_accumulated_usec(); +void update_rtt_hist(unsigned int rtt); +void pr_rtt_hist(); +void init_rtt_hist(); + +/* signal.c */ +typedef void Sigfunc(int); /* for signal handlers */ +Sigfunc * Signal(int signo, Sigfunc *func); +int Fcntl(int fd, int cmd, int arg); +int Ioctl(int fd, int request, void *arg); +void Sigemptyset(sigset_t *set); +void Sigaddset(sigset_t *set, int signo); +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset); + +#endif diff --git a/src/rttsends.c b/src/rttsends.c new file mode 100644 index 0000000..7038e2c --- /dev/null +++ b/src/rttsends.c @@ -0,0 +1,144 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + codes in this file are extracted and modified from sends.c + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" +#include +#ifdef _SUN +#include /* define SIOCGIFADDR */ +#else +#include +#endif + +extern char * my_MCAST_ADDR; +extern int my_PORT; +extern int my_TTL; +extern int my_LOOP; +extern char * my_IFname; + +/* buffer for sending (same structure as those in sends.c */ +int *mode_ptr; /* char type would cause alignment problem on Sparc */ +int *current_page_ptr; +int *total_bytes_ptr; +char *data_ptr; +char send_buff[PAGE_BUFFSIZE]; + +int pageSize; + +/* Send socket */ +int send_fd; +#ifndef IPV6 +struct sockaddr_in send_addr; +#else +struct sockaddr_in6 send_addr; +#endif + +/* + set_mode sets the caster into a new mode. + modes are defined in main.h: +*/ +void set_mode(int new_mode) +{ + *mode_ptr = htonl(new_mode); +} + + +/* init_sends initializes the send buffer */ +void init_sends(int npagesize) +{ + pageSize = (npagesize>PAGE_SIZE) ? PAGE_SIZE : npagesize; + + mode_ptr = (int *)send_buff; /* hp: add (int*) */ + current_page_ptr = (int *) (mode_ptr + 1); + total_bytes_ptr = (int *)(current_page_ptr + 1); + data_ptr = (char *)(total_bytes_ptr + 1); + + send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT); + + /******* change MULTICAST_IF ********/ + if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0) + perror("init_sends(): when set MULTICAST_IF\n"); + + /* set multicast_ttl such that UDP can go to 2nd subnetwork */ + if (mcast_set_ttl(send_fd, my_TTL) < 0) + perror("init_sends(): when set MULTICAST_TTL\n"); + + /* disable multicast_loop such that there is no echo back on master */ + if (mcast_set_loop(send_fd, my_LOOP) < 0) + perror("init_sends(): when set MULTICAST_LOOP\n"); + + /* put dummy contents into send (UDP) buffer */ + memset(data_ptr, 1, PAGE_SIZE); +} + +/* + send_buffer will send the buffer with the file information + out to the socket connection with the catcher. +*/ +int send_buffer(int bytes_read) +{ + /* Else send the data */ + if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE, + 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) { + perror("Sending packet"); + exit(1); + } + return (1); +} + +/* + send_page takes a page from the current file and controls + sending it out the socket to the catcher. It calls send_buffer + to do the actuall call to sendto. +*/ +int send_page(int page) +{ + if (verbose>=2) fprintf(stderr, "in send_page\n"); + *total_bytes_ptr = htonl(pageSize+HEAD_SIZE); + *current_page_ptr = htonl(page); + + return send_buffer(pageSize); +} + + +void send_cmd(int code, int pages) +{ + *mode_ptr = htonl(code); + *current_page_ptr = htonl(pages); + *total_bytes_ptr = htonl(HEAD_SIZE); + + send_buffer(0); +} + + +void send_all_done_cmd() +{ + *mode_ptr = htonl(ALL_DONE_CMD); + *current_page_ptr = 0; + *total_bytes_ptr = htonl(HEAD_SIZE) ; + + send_buffer(0); + if (verbose) fprintf(stderr, "(ALL DONE)\n"); +} diff --git a/src/sends.c b/src/sends.c new file mode 100644 index 0000000..61aa07a --- /dev/null +++ b/src/sends.c @@ -0,0 +1,329 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + Following the suggestion by Robert Dack , + I added the option to change the default IP address for multicasting + and the PORT for flow control. See mrsync.py for the new options. + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#ifdef _SUN +#include /* define SIOCGIFADDR */ +#else +#include +#endif + +extern char * my_MCAST_ADDR; /* defined in multicaster.c */ +extern int my_PORT; /* ditto */ +extern int my_TTL; +extern int my_LOOP; +extern char * my_IFname; /* defined in multicaster.c */ +extern int verbose; + +extern unsigned int nPages; +extern unsigned int last_bytes; /* the number of bytes in the last page of a file */ +extern int cur_entry; +extern int file_changed; +extern char* cmd_name[]; + +/* Where have we last sent from? */ +int most_recent_file; +int most_recent_fd; + +/* + Send buffer for storing the data and to be transmitted thru UDP + The format: + (5*sizeof(int) bytes header) + (PAGE_SIZE data area) + + The header has five int_type (4 bytes) int's. + (1) mode -- for master to give instructions to the target machines. + (2) current file index (starting with 1) + (3) current page index (starting with 1) -- gee! + (4) bytes to be sent in this page via UPD + (5) total number of pages for this file + + data_ptr points to the data area. +*/ +int *mode_ptr; /* char type would cause alignment problem on Sparc */ +int *current_file_ptr; +int *current_page_ptr; +int *bytes_sent_ptr; +int *total_pages_ptr; +char *data_ptr; +char *fill_here; +char send_buff[PAGE_BUFFSIZE]; + +/* Send socket */ +int send_fd; +#ifndef IPV6 +struct sockaddr_in send_addr; +#else +struct sockaddr_in6 send_addr; +#endif + +/* for final statistics */ +extern unsigned int total_pages; +extern off_t total_bytes; +off_t real_total_bytes; +unsigned int real_total_pages; + +/* + set_mode sets the caster into a new mode. + modes are defined in main.h: +*/ +void set_mode(int new_mode) +{ + /* 20060323 convert it to network byte order */ + *mode_ptr = htonl(new_mode); +} + +/* init_sends initializes the send buffer */ +void init_sends() +{ + most_recent_file = most_recent_fd = -99999; + real_total_bytes = 0; + real_total_pages = 0; + + /* pointers for send buffer */ + mode_ptr = (int *)send_buff; + current_file_ptr = (int *)(mode_ptr+1); + current_page_ptr = (int *)(current_file_ptr + 1); + bytes_sent_ptr = (int *)(current_page_ptr + 1); + total_pages_ptr = (int *)(bytes_sent_ptr + 1); + data_ptr = (char *)(total_pages_ptr + 1); + fill_here = data_ptr; + + /* send socket */ + send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT); + + /******* change MULTICAST_IF ********/ + if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0) + perror("init_sends(): when set MULTICAST_IF\n"); + + /* set multicast_ttl such that UDP can go to 2nd subnetwork */ + if (mcast_set_ttl(send_fd, my_TTL) < 0) + perror("init_sends(): when set MULTICAST_TTL\n"); + + /* disable multicast_loop such that there is no echo back on master */ + if (mcast_set_loop(send_fd, my_LOOP) < 0) + perror("init_sends(): when set MULTICAST_LOOP\n"); +} + +void clear_send_buf() +{ + fill_here = data_ptr; +} + +/* + put file contents into send (UDP) buffer + return the number of bytes put into the buffer. +*/ +ssize_t pack_page_for_file(int page) +{ + if(verbose>=2) + fprintf(stderr, "Sending page %d of file %d\n", page, current_entry()); + + /*** + Adjust the position for reading + NOTE:if the type (off_t) is not given, the large file operation + would fail. + ***/ + lseek(most_recent_fd, (off_t)PAGE_SIZE * (off_t)(page - 1), SEEK_SET); + + /* read it and put the content into send_buff */ + /* the max number this read() will return is PAGE_SIZE */ + return read(most_recent_fd, data_ptr, PAGE_SIZE); +} + +int fexist(int entry) +{ + if (entry != most_recent_file) { + if (most_recent_fd > 0) close(most_recent_fd); /* make sure we close it */ + + if((most_recent_fd = open(getFullname(), O_RDONLY, 0)) < 0){ + perror(getFullname()); + } + most_recent_file = entry; + } + return (most_recent_fd >= 0); /* <0 means FAIL */ +} + + +/* + send_buffer will send the buffer with the file information + out to the socket connection with the catcher. + return 0 -- ok, -1 sent failed. +*/ +int send_buffer(int bytes_read) +{ + /* send the data */ + if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE, + 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) { + perror("Sending packet"); + return FAIL; + } + return SUCCESS; +} + +/* + send_page takes a page from the current file and controls + sending it out the socket to the catcher. It calls send_buffer + to do the actuall call to sendto. +*/ +int send_page(int page) +{ + unsigned int bytes; + + if (verbose>=2) fprintf(stderr, "In send_page()\n"); + + if (file_changed) { + total_bytes -= ((page=3) + fprintf(stderr, "Sending page=%d of %d in file %d of %d\n", + page, nPages, + cur_entry, total_entries()); + return send_buffer(bytes); +} + +/* + send_test zeroes out the buffers going to the catcher + and thereby sends a test packet to the catcher. +*/ +void send_test() +{ + *mode_ptr = htonl(TEST); /* --> network byte order */ + + *current_file_ptr = 0; + *current_page_ptr = 0; + *total_pages_ptr = 0; + *bytes_sent_ptr = 0; + + send_buffer(0); + fprintf(stderr, "Test packet is sent.\n"); +} + +void send_cmd(int code, int machine_id) +{ + /* + Except for OPEN_FILE_CMD, + only the header area in the send_buff gets filled + with data. + */ + *mode_ptr = htonl(code); /* --> network byte order */ + *current_page_ptr = htonl(machine_id); /* -1 being all machines */ + *current_file_ptr = htonl(cur_entry); + *total_pages_ptr = htonl(nPages); + + *bytes_sent_ptr = (code == OPEN_FILE_CMD) ? htonl(fill_here - data_ptr) :0; + + /* do the header in send_buf */ + send_buffer((code == OPEN_FILE_CMD) ? fill_here - data_ptr : 0); + + /* print message */ + if (verbose>=2) { + fprintf(stderr, "cmd [%s] sent\n", cmd_name[code]); + } +} + +void pack_open_file_info() +{ + /* prepare udp send_buffer for file info + (header) (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0 + */ + clear_send_buf(); + fill_here += fill_in_stat(data_ptr); + fill_here += fill_in_filename(fill_here); + if (is_softlink() || is_hardlink()) { + fill_here += fill_in_linktar(fill_here); + } +} + +void send_all_done_cmd() +{ + int i; + *mode_ptr = htonl(ALL_DONE_CMD); /* --> network byte order */ + + *current_file_ptr = 0; + *current_page_ptr = 0; + *total_pages_ptr = 0; + *bytes_sent_ptr = 0; + + for(i=0; i<10; ++i) { /* do it many times, in case network is busy */ + send_buffer(0); + usleep(DT_PERPAGE*10); + } + /* NOTE: it is still possible that ALL_DONE msg could not + be received by targets. + For total robustness, some independent checking on targets + should be done. + */ + fprintf(stderr, "(ALL DONE)\n"); +} + +void my_exit(int good_or_bad) +{ + if (send_fd) send_all_done_cmd(); + exit(good_or_bad); +} diff --git a/src/set_catcher_mcast.c b/src/set_catcher_mcast.c new file mode 100644 index 0000000..9a04017 --- /dev/null +++ b/src/set_catcher_mcast.c @@ -0,0 +1,142 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + + This file collects functions related to setting multicast + for multicatcher. They are IPv4 and IPv6 ready. + By default, we use IPv4. + To use IPv6, we need to specify -DIPv6 in Makefile. + The functions in this file are collected from + Richard Stevens' Networking bible: Unix Network programming + + I added Mcast_join(). + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#ifdef _SUN +#include /* define SIOCGIFADDR */ +#else +#include +#endif + +#define SA struct sockaddr + +int mcast_join(int sockfd, const SA *sa, socklen_t salen, + const char *ifname, u_int ifindex) +{ + switch (sa->sa_family) { + case AF_INET: { + struct ip_mreq mreq; + struct ifreq ifreq; + + memcpy(&mreq.imr_multiaddr, + &((struct sockaddr_in *) sa)->sin_addr, + sizeof(struct in_addr)); + + if (ifindex > 0) { + if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) { + errno = ENXIO; /* i/f index not found */ + return(-1); + } + goto doioctl; + } else if (ifname != NULL) { + strncpy(ifreq.ifr_name, ifname, IFNAMSIZ); +doioctl: + if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0) + return(-1); + memcpy(&mreq.imr_interface, + &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr, + sizeof(struct in_addr)); + } else + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + + return(setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreq, sizeof(mreq))); + } + +#ifdef IPV6 + case AF_INET6: { + struct ipv6_mreq mreq6; + + memcpy(&mreq6.ipv6mr_multiaddr, + &((struct sockaddr_in6 *) sa)->sin6_addr, + sizeof(struct in6_addr)); + + if (ifindex > 0) + mreq6.ipv6mr_interface = ifindex; + else if (ifname != NULL) + if ( (mreq6.ipv6mr_interface = if_nametoindex(ifname)) == 0) { + errno = ENXIO; /* i/f name not found */ + return(-1); + } + else + mreq6.ipv6mr_interface = 0; + + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, + &mreq6, sizeof(mreq6))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex) +{ + #ifndef IPV6 + /* IPv4 */ + struct sockaddr_in sa; + sa.sin_family = AF_INET; + inet_pton(AF_INET, mcast_addr, &sa.sin_addr); + #else + struct sockaddr_in6 sa; + sa.sin6_family = AF_INET6; + inet_pton(AF_INET6, mcast_addr, &sa.sin6_addr); + #endif + + return (mcast_join(sockfd, (struct sockaddr *) &sa, sizeof(sa), + ifname, ifindex)); +} + +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr) +{ + switch (sa->sa_family) { + case AF_INET: { + struct sockaddr_in *sin = (struct sockaddr_in *) sa; + + memcpy(&sin->sin_addr, addr, sizeof(struct in_addr)); + return; + } + + #ifdef IPV6 + case AF_INET6: { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) sa; + + memcpy(&sin6->sin6_addr, addr, sizeof(struct in6_addr)); + return; + } + #endif + } + + return; +} + diff --git a/src/set_mcast.c b/src/set_mcast.c new file mode 100644 index 0000000..aac3d36 --- /dev/null +++ b/src/set_mcast.c @@ -0,0 +1,160 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + + This file collects functions related to setting multicast + for multicaster. They are IPv4 and IPv6 ready. + By default, we use IPv4. + To use IPv6, we need to specify -DIPv6 in Makefile. + The functions in this file are collected from + Richard Stevens' Networking bible: Unix Network programming + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#ifdef _SUN +#include /* define SIOCGIFADDR */ +#else +#include +#endif + +#define SA struct sockaddr +#define MAXSOCKADDR 128 /* max socket address structure size */ + +int sockfd_to_family(int sockfd) +{ + union { + struct sockaddr sa; + char data[MAXSOCKADDR]; + } un; + socklen_t len; + + len = MAXSOCKADDR; + if (getsockname(sockfd, (SA *) un.data, &len) < 0) + return(-1); + return(un.sa.sa_family); +} + +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + struct in_addr inaddr; + struct ifreq ifreq; + + if (ifindex > 0) { + if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) { + errno = ENXIO; /* i/f index not found */ + return(-1); + } + goto doioctl; + } else if (ifname != NULL) { + strncpy(ifreq.ifr_name, ifname, IFNAMSIZ); +doioctl: + if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0) + return(-1); + memcpy(&inaddr, + &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr, + sizeof(struct in_addr)); + } else + inaddr.s_addr = htonl(INADDR_ANY); /* remove prev. set default */ + + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF, + &inaddr, sizeof(struct in_addr))); + } + +#ifdef IPV6 + case AF_INET6: { + u_int index; + + if ( (index = ifindex) == 0) { + if (ifname == NULL) { + errno = EINVAL; /* must supply either index or name */ + return(-1); + } + if ( (index = if_nametoindex(ifname)) == 0) { + errno = ENXIO; /* i/f name not found */ + return(-1); + } + } + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_IF, + &index, sizeof(index))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + +int mcast_set_loop(int sockfd, int onoff) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + u_char flag; + + flag = onoff; + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, + &flag, sizeof(flag))); + } + +#ifdef IPV6 + case AF_INET6: { + u_int flag; + + flag = onoff; + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, + &flag, sizeof(flag))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} +/* end mcast_set_loop */ + +int mcast_set_ttl(int sockfd, int val) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + u_char ttl; + + ttl = val; + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, + &ttl, sizeof(ttl))); + } + +#ifdef IPV6 + case AF_INET6: { + int hop; + + hop = val; + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, + &hop, sizeof(hop))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + diff --git a/src/setup_socket.c b/src/setup_socket.c new file mode 100644 index 0000000..3625a2a --- /dev/null +++ b/src/setup_socket.c @@ -0,0 +1,242 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + make it IPv6-ready + Copyright (C) 2005 Renaissance Technologies Corp. + file name is changed from main.c to setup_socket.c + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ +/* + This part was based on + (1) codes by Aaron Hillegass + (2) codes in Steven's book 'network programming' + + 200605 change it to make it IPv6 ready + +*/ + +#include +#include +#include +#include +#include +#include /* sockaddr_in{} and other Internet defns */ +#include /* inet(3) functions */ +#include + +extern int verbose; +int delay_sec; +int delay_usec; + + +/* Set up for catcher's complaint socket */ +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port) +#else +int complaint_socket(struct sockaddr_in6 *addr, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + if (verbose>=2) fprintf(stderr, "in send_socket_ip\n"); + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + /*addr->sin_addr.s_addr = ip; this fx in for init process only + This ip will be overwritten later after + 1st packet is received. */ + #else + /* IPv6 */ + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + /* addr->sin_addr.s_addr = ip; see comments above */ + #endif + + if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Send socket"); + exit(1); + } + return fd; +} + +/* Set up mcast send socket for multicaster based on (char*)cp */ +#ifndef IPV6 +int send_socket(struct sockaddr_in *addr, char * cp, int port) +#else +int send_socket(struct sockaddr_in6 *addr, char * cp, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + char buf[50]; + if (verbose>=2) fprintf(stderr, "in send_socket\n"); + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + /*addr->sin_addr.s_addr = inet_addr(cp);*/ + inet_pton(AF_INET, cp, &addr->sin_addr); + /* Print out IP address and port */ + inet_ntop(AF_INET, &addr->sin_addr, buf, 50); + #else + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + /*addr->sin_addr.s_addr = inet_addr(cp);*/ + inet_pton(AF_INET6, cp, &addr->sin6_addr); + /* Print out IP address and port */ + inet_ntop(AF_INET6, &addr->sin6_addr, buf, 50); + #endif + + if (verbose>=2) + fprintf(stderr, "Creating a send socket to %s:%d\n", buf, port); + + if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Send socket"); + exit(1); + } + + if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){ + perror("in send_socket(): bind error (need to change MCAST_ADDR)"); + exit(1); + } + return fd; /*send_socket_ip(addr, address, port);*/ +} + +/* set up socket on the receiving end */ +#ifndef IPV6 +int rec_socket(struct sockaddr_in *addr, int port) +#else +int rec_socket(struct sockaddr_in6 *addr, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + addr->sin_addr.s_addr = htonl(INADDR_ANY); + #else + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + addr->sin6_addr = in6addr_any; /* RS book: page 92 */ + #endif + + if((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Socket create"); + exit(1); + } + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, NULL, 0); + + if (verbose>=2) + fprintf(stderr, "Creating receive socket on port %d\n", port); + + /*if ((bind(fd, addr, sizeof(*addr))) < 0){ MOD by RWM: replaced by next line */ + if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){ + perror("in rec_socket(): bind error (need to change PORT)"); + exit(1); + } + return fd; +} + +/* + set the values of two variables to be used by select() + in readable(). +*/ +void set_delay(int secs, int usecs) +{ + if (verbose>=2) { + if (usecs == -1) + fprintf(stderr, "Timeout: set to infinite\n"); + else + fprintf(stderr, "Timeout: set to %d sec + %d usec\n", secs, usecs); + } + delay_sec = secs; + delay_usec = usecs; +} + +void get_delay(int * secs, int * usecs) +{ + *secs = delay_sec; + *usecs= delay_usec; +} + +/* + Check if there is an incoming UDP for a certain amount + of time period specified in delay_tv.. + + Return 'true' if there is. + Return 'false' if no valid UDP has arrived within the time period. + + In multicaster, this is used in read_handle_page() right after + send_page() is carried out. As a result, read_handle_complaint() waits + for any incoming complaints from any target machines within + the specified time period. As it is now, no target machine + will send back complaints during the transmission of all the + pages for a file. Therefore, read_handle_complaint() serves effectively + as a time delay between sending of each page. +*/ +int readable(int fd) +{ + struct timeval delay_tv; + fd_set rset; + FD_ZERO(&rset); + FD_SET(fd, &rset); + + /* + if microsec == -1 wait forever for a packet. + This is used in the beginning when multicatcher is just + invoked. + */ + if (delay_usec == -1){ + return(select(fd + 1, &rset, NULL, NULL, NULL)); + } else { + delay_tv.tv_sec = delay_sec; + delay_tv.tv_usec = delay_usec; + return (select(fd + 1, &rset, NULL, NULL, &delay_tv)); + } +} + diff --git a/src/signal.c b/src/signal.c new file mode 100644 index 0000000..5b49b52 --- /dev/null +++ b/src/signal.c @@ -0,0 +1,93 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + The code in this file is copied from + Richard Stevens' book + "UNIX Network Programming" Chap.22.3 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "signal.h" + +Sigfunc * signal(int signo, Sigfunc *func) +{ + struct sigaction act, oact; + + act.sa_handler = func; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + if (signo == SIGALRM) { +#ifdef SA_INTERRUPT + act.sa_flags |= SA_INTERRUPT; /* SunOS 4.x */ +#endif + } else { +#ifdef SA_RESTART + act.sa_flags |= SA_RESTART; /* SVR4, 44BSD */ +#endif + } + if (sigaction(signo, &act, &oact) < 0) + return(SIG_ERR); + return(oact.sa_handler); +} +/* end signal */ + +Sigfunc * Signal(int signo, Sigfunc *func) /* for our signal() function */ +{ + Sigfunc *sigfunc; + + if ( (sigfunc = signal(signo, func)) == SIG_ERR) + perror("signal error"); + return(sigfunc); +} + +int Fcntl(int fd, int cmd, int arg) +{ + int n; + + if ( (n = fcntl(fd, cmd, arg)) == -1) + perror("fcntl error"); + return(n); +} + +int Ioctl(int fd, int request, void *arg) +{ + int n; + + if ( (n = ioctl(fd, request, arg)) == -1) + perror("ioctl error"); + return(n); /* streamio of I_LIST returns value */ +} + +void Sigemptyset(sigset_t *set) +{ + if (sigemptyset(set) == -1) + perror("sigemptyset error"); +} + +void Sigaddset(sigset_t *set, int signo) +{ + if (sigaddset(set, signo) == -1) + perror("sigaddset error"); +} + +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset) +{ + if (sigprocmask(how, set, oset) == -1) + perror("sigprocmask error"); +} diff --git a/src/signal.h b/src/signal.h new file mode 100644 index 0000000..bf309f8 --- /dev/null +++ b/src/signal.h @@ -0,0 +1,32 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include +#include +#include +#include +/******* on linux: ioctl() is defined in sys/ioctl.h instead of unistd.h as on SunOS *****/ +#include +#include + +typedef void Sigfunc(int); /* for signal handlers */ + diff --git a/src/timing.c b/src/timing.c new file mode 100644 index 0000000..d4baa15 --- /dev/null +++ b/src/timing.c @@ -0,0 +1,109 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include +#include + +#define N 501 /* effectively set the maximum time in rtt_hist to be 500 msec */ + +extern int verbose; + +/* for timing */ +struct timeval tv0, tv1; +unsigned long usec_acc, sec_acc; /* accumulator of timing */ +unsigned int rtt_hist[N]; /* rtt_hist[i] = count of rtt within (i, i+1) */ + +void refresh_timer() +{ + usec_acc = 0; + sec_acc = 0; +} + +void start_timer() +{ + struct timezone tz; + gettimeofday(&tv0, &tz); +} + +void end_timer() +{ + struct timezone tz; + gettimeofday(&tv1, &tz); /* end timer -------- */ +} + +void update_time_accumulator() +{ + if (tv1.tv_usec(N-2)) index = N-1; + rtt_hist[index]++; +} + +void pr_rtt_hist() +{ + int i; + fprintf(stderr, "rtt histogram\n"); + fprintf(stderr, "msec counts\n"); + fprintf(stderr, "---- --------\n"); + for(i=0; i10) continue; + if (rtt_hist[i] != 0) { + fprintf(stderr, "%4d %u\n", i, rtt_hist[i]); + } + } +} + +unsigned int pages_wo_ack() +{ + return rtt_hist[N-1]; +} diff --git a/src/trFilelist.c b/src/trFilelist.c new file mode 100644 index 0000000..8f65796 --- /dev/null +++ b/src/trFilelist.c @@ -0,0 +1,449 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +/***this code parse the synclist file generated by rsync in dry-run mode. + The minimum options for rsync is : -avW --dry-run --delete + e.g. + /usr/local/bin/rsync --rsync-path=/usr/local/bin/rsync + -avW --dry-run --delete + /src/path/ dest_machine:/target/path/ > output 2>&1 +A typical output of rsync may look like this: +Client is very old version of rsync, upgrade recommended. +building file list ... done +xyz -> ./sub1/xyz +file1 +fn +fn1 +j +sub1/hardlink_to_file1 +sub1/path/testfile +sub1/xyz +sent 337 bytes read 44 bytes 254.00 bytes/sec +total size is 320751046 speedup is 841866.26 + +This code does the following three things. +(1) skip the lines before 'done' and after 'wrote' +(2) output all directories and file_path + e.g + for an entry: sub1/path/testfile, the output is + sub1 + sub1/path + sub1/path/testfile +(3) xyz -> ./sub1/xyz + the output is + xyz +(4) If file1 and sub1/hardlink are hardlinked + the output is + file1 + sub1/hardlink file1 + +For the example output above, the output of this code is: + +xyz +file1 +fn +fn1 +j +sub1 +sub1/hardlink_to_file1 file1 +sub1/path +sub1/path/testfile +sub1/xyz + +***/ + +#include +#include +#include +#include +#include /* to define PATH_MAX */ +#include + +#define TRUE 1 +#define FALSE 0 + +struct string_list { + int capacity; + char ** endp; + char ** str; +}; + +void init_string_list(struct string_list * str_ptr, int n) +{ + str_ptr->str = malloc(n * sizeof(void*)); + str_ptr->capacity = n; + str_ptr->endp = str_ptr->str; +} + +void grow_string_list(struct string_list * slp) +{ + int new_capacity = 2 * slp->capacity; + char ** old_ptrs = slp->str; + char ** new_ptrs; + char ** newp = malloc(new_capacity * sizeof(void *)); + new_ptrs = newp; + + while (old_ptrs < slp->endp) { + *new_ptrs++ = *old_ptrs++; + } + + free(slp->str); + slp->str = newp; + slp->endp= new_ptrs; + slp->capacity = new_capacity; +} + +void append_string_list(char * str, struct string_list * slp) +{ + if (slp->endp - slp->str == slp->capacity) grow_string_list(slp); + *slp->endp = strdup(str); + (slp->endp)++; +} + +/*************** change to return index ****/ +int find_string(char * str, struct string_list * slp) +{ + /* find if str is in the list */ + int i; + int n = slp->endp - slp->str; + + for(i=0; istr)[i], str)==0) return i; + } + return -1; +} + +/* find if a string in string-list is a sub-string of str */ +int has_sub_string(char * str, struct string_list *slp) +{ + int i; + int n = slp->endp - slp->str; + + for(i=0; istr)[i], strlen((slp->str)[i]))==0) return i; + } + return -1; +} + +/* find if the str is a substr of those in slp */ +int has_newdir(char *str, struct string_list *slp) +{ + int i; + int n = slp->endp - slp->str; + + for(i=0; istr)[i],str, strlen(str))==0) return i; + } + return -1; +} + +struct uint_list { + int capacity; + unsigned int * endp; + unsigned int * d; +}; + +void init_uint_list(struct uint_list * uil_ptr, int n) +{ + uil_ptr->d = malloc(n * sizeof(unsigned int)); + uil_ptr->capacity = n; + uil_ptr->endp = uil_ptr->d; +} + +void grow_uint_list(struct uint_list * uilp) +{ + int new_capacity = 2 * uilp->capacity; + unsigned int * old_ptrs = uilp->d; + unsigned int * new_ptrs; + unsigned int * newp = malloc(new_capacity * sizeof(unsigned int)); + new_ptrs = newp; + + while (old_ptrs < uilp->endp) { + *new_ptrs++ = *old_ptrs++; + } + + free(uilp->d); + uilp->d = newp; + uilp->endp= new_ptrs; + uilp->capacity = new_capacity; +} + +void append_uint_list(unsigned int data, struct uint_list * uilp) +{ + if (uilp->endp - uilp->d == uilp->capacity) grow_uint_list(uilp); + *uilp->endp = data; + (uilp->endp)++; +} +/*************** change to return index ****/ +int find_unit(unsigned int data, struct uint_list * uilp) +{ + /* find if data is in the list */ + int i; + int n = uilp->endp - uilp->d; + + for(i=0; id)[i] == data) return i; + } + return -1; +} + +struct string_list file_list; +struct uint_list ino_list; +struct string_list dir_list; +struct string_list softlink_list; /* for (a) */ +struct string_list newdir_list; /* for (b) */ + +void strip(char * str) +{ + /* remove trailing \n and spaces */ + char *pt; + char *pc = &str[strlen(str)-1]; + while (*pc == ' ' || *pc == '\n') *(pc--) = '\0'; + /* 20080317 remove leading spaces */ + pt = pc = &str[0]; + while (*pc == ' ') ++pc; + if (pc != pt) { + while (*pc != '\0') *pt++ = *pc++; + *pt = '\0'; + } +} + +void output_subs(char * str) +{ + return; /*************************** testing ***************/ + /* to do (2) indicated in the above */ + /******** + char * pc; + char subs[PATH_MAX]; + pc = strstr(str, "/"); + if (!pc) return; + + while (pc) { + strncpy(subs, str, pc-str); + subs[pc-str] = '\0'; + if (find_string(subs, &dir_list)<0) { + printf("%s\n", subs); + append_string_list(subs, &dir_list); + } + pc = strstr(pc+1, "/"); + } + ************/ +} + +/*** (a) + get those softlinks that points to a directory + this is to deal with the following scenario + previous structure + dir_path (a directory) + db (a directory) + + newly updated structure on master + dir_path -> db + db + + rsync --dry-run generates + dir_path -> db [a link is done on target] + deleting dir_path/sub/filename1 [wrong file gets removed ] + deleting dir_path/sub/filename2... + + file_operations.c does this when dir_path -> db is due + delete dir_path (rm -rf) + make the softlink + But then the following delete will have undesired deletion. + + ------------------------------------------------------------ + + (b) + t0 name -> xyz name -> xyz (target) + t1 name/ name -> xyz + + rsync generates + name/ update_directory() won't have effect + name/f1 delivered to wrong place + name/f2 + deleting name too late + ** the deletion should be done before not after. + For now, I will fail this code for this situation. + +***/ +void get_dir_softlinks(char *filename, char * basedir) { + FILE * fd; + char line[PATH_MAX]; + struct stat st; + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + exit(-1); + } + + while (1) { /* for each line in the file */ + char *pc; + char fn[PATH_MAX]; + + if (fgets(line, PATH_MAX, fd)==NULL) break; + strip(line); + if (strlen(line) == 0) continue; /* skip blank line */ + + /* the softlink case is indicated by -> */ + pc= strstr(line, " -> "); + if (pc) { /* it is a softlink */ + *pc = '\0'; + /* check if it is a directory */ + sprintf(fn, "%s/%s", basedir, line); + + /* check if the link-target is a directory */ + if (stat(fn, &st)<0) continue; /* We skip this bad entry - no longer exist */ + + if (S_ISDIR(st.st_mode)) { + append_string_list(line, &softlink_list); + } + } else { /* not a softlink --> find if it is a directory */ + /* find a line without ' ' and with trailing '/' */ + pc = strstr(line, " "); /* the first space */ + if (!pc) { + char * plast = &line[0] + strlen(line) - 1; + if (*plast == '/') { + append_string_list(line, &newdir_list); + } + } + } + } + + fclose(fd); +} + + +int main(int argc, char * argv[]) +{ + char * filename; + char * basedir; + FILE *fd; + char line[PATH_MAX]; + + if (argc < 3) { + fprintf(stderr, "Usage: trFilelist synclist_filename basedir\n"); + exit(-1); + } + + filename = argv[1]; + basedir = argv[2]; + + init_string_list(&file_list, 10); + init_uint_list(&ino_list, 10); + init_string_list(&dir_list, 100); + init_string_list(&softlink_list, 10); + init_string_list(&newdir_list, 100); + + get_dir_softlinks(filename, basedir); + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + return -1; + } + + while (1) { /* for each line in the file */ + char *pc; + char fn[PATH_MAX]; + struct stat st; + int newdir_flag; + + if (fgets(line, PATH_MAX, fd)==NULL) break; + strip(line); + if (strlen(line) == 0) continue; /* skip blank line */ + if (strcmp(line, ".")==0) continue; + if (strcmp(line, "./")==0) continue; + + /* first we look for deleting entry */ + if (strncmp(line, "deleting ", 9)==0) { + /* deleting (directory) file_path */ + char * p1, *p2, *pf; + + p1 = strstr(line, " "); /* the first space */ + p2 = strstr(p1+1, " "); /* deleting directory filepath * 20070912 this is old */ + pf = (p2) ? p2+1 : p1+1;/* it's always p1+1 */ + + newdir_flag = has_newdir(pf, &newdir_list); + + if ((has_sub_string(pf, &softlink_list)<0) && newdir_flag<0) { + /* see comments above get_dir_softlinks() */ + printf("deleting %s\n", pf); + } else if (newdir_flag>=0) { /* temporary action */ + /*** we can simply skip this block later. 20070912 ***/ + /***/ + fprintf(stderr, "CRITICAL ERROR: An old softlink has been changed to a directory!\n"); + fprintf(stderr, " For now, we crash this code for human intervention\n"); + fprintf(stderr, " line= %s\n", line); + exit(-1); + /***/ + } + + continue; + } + + /* the softlink case is indicated by -> */ + pc= strstr(line, " -> "); + if (pc) { + *pc = '\0'; + output_subs(line); + printf("%s\n", line); + continue; + } + + /* if rsync's -H is turned on, the output may contain + file => tar_hardlink_file (relative address) + */ + pc= strstr(line, " => "); + if (pc) { + *pc = '\0'; + output_subs(line); + printf("%s %s\n", line, pc+4); + continue; + } + + /* the rest of the entries should be valid paths */ + sprintf(fn, "%s/%s", basedir, line); + if (lstat(fn, &st)<0) continue; /* We skip this bad entry - + (1) the header and tail lines + (2) perhaps the file no longer exists */ + + /* is this a hardlink? */ + if (st.st_nlink > 1) { + int index; + output_subs(line); + if ((index = find_unit((unsigned int)st.st_ino, &ino_list))<0) { + append_uint_list((unsigned int)st.st_ino, &ino_list); + append_string_list(line, &file_list); /* relative path */ + printf("%s\n", line); + } else { + printf("%s %s\n", line, file_list.str[index]); + } + continue; + } + + /* all others */ + output_subs(line); + printf("%s\n", line); + } /* end of one line */ + + fclose(fd); + return 0; +} -- cgit v1.2.3-70-g09d2