aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backup.c108
-rw-r--r--src/complaint_sender.c158
-rw-r--r--src/complaints.c579
-rw-r--r--src/file_operations.c800
-rw-r--r--src/global.c35
-rw-r--r--src/id_map.c74
-rw-r--r--src/main.h189
-rw-r--r--src/main.h.in189
-rw-r--r--src/multicaster.c582
-rw-r--r--src/multicatcher.c181
-rw-r--r--src/page_reader.c426
-rw-r--r--src/parse_synclist.c320
-rw-r--r--src/proto.h182
-rw-r--r--src/rtt.c258
-rw-r--r--src/rttcatcher.c118
-rw-r--r--src/rttcomplaint_sender.c103
-rw-r--r--src/rttcomplaints.c270
-rw-r--r--src/rttmain.h126
-rw-r--r--src/rttmissings.c93
-rw-r--r--src/rttpage_reader.c188
-rw-r--r--src/rttproto.h116
-rw-r--r--src/rttsends.c144
-rw-r--r--src/sends.c329
-rw-r--r--src/set_catcher_mcast.c142
-rw-r--r--src/set_mcast.c160
-rw-r--r--src/setup_socket.c242
-rw-r--r--src/signal.c93
-rw-r--r--src/signal.h32
-rw-r--r--src/timing.c109
-rw-r--r--src/trFilelist.c449
30 files changed, 6795 insertions, 0 deletions
diff --git a/src/backup.c b/src/backup.c
new file mode 100644
index 0000000..3c59a9d
--- /dev/null
+++ b/src/backup.c
@@ -0,0 +1,108 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <regex.h> /* use POSIX in order to be portable to linux */
+
+extern int verbose;
+int backup = FALSE;
+char *pattern_baseDir = NULL;
+
+/* --------- for syncing all files with/without backup. */
+int nPattern=0; /* number of regular expression for backup files */
+regex_t ** fPatterns;/* array of pointers to regular expression */
+
+int set_nPattern(char * fpat_file)
+{
+ FILE *fd;
+ char pat[PATH_MAX];
+ int count = 0;
+ if ((fd = fopen(fpat_file, "r"))==NULL) {
+ fprintf(stderr, "Cannot open file -- %s\n", fpat_file);
+ return FAIL;
+ }
+ while (!feof(fd)) {
+ if (fscanf(fd, "%s", pat) == 1) {
+ ++count;
+ }
+ }
+ nPattern = count;
+ fclose(fd);
+ return SUCCESS;
+}
+
+int read_backup_pattern(char * fpat_file)
+{
+ FILE *fd;
+ char pat[PATH_MAX];
+ int i = 0;
+
+ if (!set_nPattern(fpat_file)) return FAIL;
+
+ fPatterns = malloc(sizeof(void *)*nPattern);
+ for(i = 0; i< nPattern; ++i) {
+ fPatterns[i] = (regex_t *) malloc(sizeof(regex_t));
+ }
+
+ fd = fopen(fpat_file, "r");
+
+ /*
+ if we don't prepend ^,
+ then pattern 'file' intended for files under srcBase
+ will select files with pattern = subdir/file.*
+ which is not our intention.
+ */
+ i = 0;
+ while (!feof(fd)) {
+ if (fscanf(fd, "%s", pat) == 1) {
+ char fullpat[PATH_MAX];
+ sprintf(fullpat, "^%s", pat);
+
+ regcomp(fPatterns[i], fullpat, REG_EXTENDED|REG_NOSUB);
+ ++i;
+ }
+ };
+ fclose(fd);
+ return SUCCESS;
+}
+
+int needBackup(char * filename) /* fullpath */
+{
+ /* we reach this point when the backup flag is true
+ if no_pattern -> backup all of them
+ if pattern
+ if match -> backup
+ nomatch -> no-backup
+ */
+ int i;
+ char *p;
+ if (!backup) return FALSE;
+ if (nPattern==0) return TRUE;
+
+ p = filename + strlen(pattern_baseDir) + 1; /* +1 to get pass the / after basedir */
+ /* fprintf(stderr, "nPattern = %d file= %s\n", nPattern, p); ********/
+ for(i=0; i<nPattern; ++i) {
+ if (regexec(fPatterns[i], p, (size_t)0, NULL, 0)!=0)
+ continue; /* no match */
+ return TRUE;
+ }
+ return FALSE;
+}
+
diff --git a/src/complaint_sender.c b/src/complaint_sender.c
new file mode 100644
index 0000000..53cfdb4
--- /dev/null
+++ b/src/complaint_sender.c
@@ -0,0 +1,158 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+
+/* complaint_send_socket */
+int complaint_fd;
+#ifndef IPV6
+struct sockaddr_in complaint_addr;
+#else
+struct sockaddr_in6 complaint_addr;
+#endif
+
+extern int my_FLOW_PORT;
+extern int verbose;
+
+int seq = 0;
+
+/* send buffer */
+char complaint_buffer[FLOW_BUFFSIZE];
+int *ccode_ptr; /* complain code ---- see main.h */
+int *cmid_ptr; /* which machine*/
+int *cfile_ptr; /* which file -- for missing page */
+int *npage_ptr; /* # of pages -- for missing page */
+int *pArray_ptr; /* missing page arrary */
+int *fill_ptr; /* point to next array element */
+
+/* ----------------------------------------------------------------
+ routines to fill in pArray with the missing page indexes
+ --------------------------------------------------------------- */
+void fill_in_int(int i)
+{
+ *fill_ptr++ = htonl(i);
+}
+
+void init_fill_ptr()
+{
+ fill_ptr = pArray_ptr;
+}
+
+/*----------------------------------------------------------
+ init_complaint_sender initializes the buffer to allow the
+ catcher to send complaints back to the sender.
+
+ ret_address of sender to whom we will complain
+ is determined when we receive the first UDP data
+ in read_handle_page() in page_reader.c
+ ----------------------------------------------------------*/
+void init_complaint_sender() /* (struct sockaddr_in *ret_addr) */
+{
+ /* ret_addr is sent by master, in network-byte-order */
+ if (verbose>=2)
+ fprintf(stderr, "in init_complaint_sender\n");
+ /* init the send_socket */
+ complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT);
+
+ /* set up the pointers so we know where to put complaint_data */
+ ccode_ptr = (int *) complaint_buffer;
+ cmid_ptr = (int *)(ccode_ptr + 1);
+ cfile_ptr = (int *)(cmid_ptr + 1);
+ npage_ptr = (int *)(cfile_ptr + 1);
+ pArray_ptr= (int *)(npage_ptr + 1);
+}
+
+#ifndef IPV6
+void update_complaint_address(struct sockaddr_in *sa)
+{
+ sock_set_addr((struct sockaddr *) &complaint_addr,
+ sizeof(complaint_addr), (void*)&sa->sin_addr);
+}
+#else
+void update_complaint_address(struct sockaddr_in6 *sa)
+{
+ sock_set_addr((struct sockaddr *) &complaint_addr,
+ sizeof(complaint_addr), (void*)&sa->sin6_addr);
+}
+#endif
+
+/*------------------------------------------------------------------------
+ send_complaint fills the complaint buffer and send it through our socket
+ back to the sender
+
+ The major use is to tell master machine which pages of which file
+ needs to be re-transmitted.
+ complaint -- the complain code defined in main.h
+ mid -- machine id
+ file -- the file index
+ npage -- # of missing pages
+ followed by an array of missing page index [ page_1, page_2, ... ]
+
+ It is also used for sending back acknoledgement.
+ complaint -- the ack code defined in main.h in the same complaint section.
+ mid -- machine id
+ file -- which file
+ page -- seq number (out of seq complaints will be ignored by the catcher)
+ ------------------------------------------------------------------------*/
+void send_complaint(int complaint, int mid, int page, int file)
+{
+ /* fill in the complaint data */
+ /* 20060323 add converting to network byte-order before sending out */
+ int bytes;
+ *ccode_ptr = htonl(complaint);
+ *cmid_ptr = htonl(mid);
+ *cfile_ptr = htonl(file);
+ if (complaint==MISSING_PAGE || complaint==MISSING_TOTAL) {
+ *npage_ptr = htonl(page);
+ } else {
+ *npage_ptr = htonl(seq++);
+ }
+
+ bytes = (complaint==MISSING_PAGE) ? ((char*)fill_ptr - (char*)ccode_ptr)
+ : (char*)pArray_ptr - (char*)ccode_ptr;
+
+ /* send it */
+ if(sendto(complaint_fd, complaint_buffer, bytes, 0,
+ (const struct sockaddr *)&complaint_addr,
+ sizeof(complaint_addr)) < 0) {
+ perror("Sending complaint\n");
+ }
+ if (verbose>=2)
+ printf("Sent complaint:code=%d mid=%d page=%d file=%d bytes=%d\n",
+ complaint, mid, page, file, bytes);
+}
+
+
+
+
+
+
+
+
diff --git a/src/complaints.c b/src/complaints.c
new file mode 100644
index 0000000..fb48da2
--- /dev/null
+++ b/src/complaints.c
@@ -0,0 +1,579 @@
+/*
+ Copyright (C) 2005-2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+/*#include "paths.h"*/
+#include <sys/types.h>
+#include <time.h>
+#include <setjmp.h>
+#include <signal.h>
+
+extern int monitorID; /* defined in multicaster.c */
+extern int my_FLOW_PORT;
+extern int verbose;
+extern int nPages;
+extern char * cmd_name[];
+extern unsigned int total_pages, real_total_pages;
+extern off_t total_bytes, real_total_bytes;
+
+/* buffer for receiving complaints */
+char flow_buff[FLOW_BUFFSIZE];
+int *code_ptr; /* What's wrong? */
+int *mid_ptr; /* machine id */
+int *file_ptr; /* which file */
+int *npage_ptr; /* # of pages */
+int *pArray_ptr;/* missing page arrary */
+
+/* receive socket */
+int complaint_fd;
+#ifndef IPV6
+struct sockaddr_in complaint_addr;
+#else
+struct sockaddr_in6 complaint_addr;
+#endif
+
+/* status */
+char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */
+int *last_seq; /* array of size nMachines -- seq number of complaints */
+ /* *** watch out the max seq = 2e9 */
+int *total_missing_page; /* array of size nMachines -- persistent thru life of program*/
+char *file_received; /* array of size nMachines */
+char *bad_machines; /* array of size nMachines -- persistent thru life of program*/
+char *machine_status; /* array of size nMachines for ack */
+int *missing_pages; /* array of size nMachines */
+
+int nMachines;
+int has_missing; /* some machines have missing pages for this file (a flag)*/
+int has_sick; /* some machines are sick for this file (a flag)*/
+int skip_count=0; /* number of files that are not delivered */
+int quitWithOneBad=FALSE; /* default: continue with one or more (<all) bad machine */
+
+/* flow control */
+int my_ACK_WAIT_TIMES = ACK_WAIT_TIMES;
+
+/*
+ init_complaints initializes our buffers to receive complaint information
+ from the catchers
+*/
+void init_complaints()
+{
+ int rcv_size;
+
+ if (verbose>=2)
+ fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE);
+
+ /* get pointers set to the right place in buffer */
+ code_ptr = (int *) flow_buff;
+ mid_ptr = (int *) (code_ptr + 1);
+ file_ptr = (int *) (mid_ptr + 1);
+ npage_ptr = (int *) (file_ptr + 1);
+ pArray_ptr= (int *) (npage_ptr+1);
+
+ /* Receive socket (the default buffer size is 65535 bytes */
+ if (verbose>=2) printf("set up receive socket for complaints\n");
+ complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT);
+
+ /*
+ getsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &i, &il);
+ printf(" rcvbuf = %d type = %d\n", i, il);
+ exit(0);
+ the default in our machines -> size = 65535 and type = 4
+ */
+
+ rcv_size = TOTAL_REC_PAGE * FLOW_BUFFSIZE;
+ if (setsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){
+ perror("Expanding receive buffer for init_complaints");
+ }
+}
+
+void init_missing_page_flag(int n)
+{
+ int i;
+ nPages = n;
+ if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) {
+ fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n);
+ perror("error = ");
+ exit(0);
+ }
+ for(i=0; i<nPages; ++i) {
+ missing_page_flag[i] = RECEIVED;
+ }
+}
+
+void page_sent(int page)
+{
+ missing_page_flag[page] = RECEIVED;
+}
+
+void free_missing_page_flag()
+{
+ free(missing_page_flag);
+ missing_page_flag = NULL;
+}
+
+void set_has_missing()
+{
+ has_missing = TRUE;
+}
+
+void reset_has_missing()
+{
+ has_missing = FALSE;
+}
+
+void set_has_sick()
+{
+ has_sick = TRUE;
+}
+
+void reset_has_sick()
+{
+ has_sick = FALSE;
+}
+
+int has_sick_machines()
+{
+ return has_sick;
+}
+
+int has_missing_pages()
+{ /* originally, this fx use missing_page_flag[]
+ That will not work if the master did not receive missing page info */
+ return has_missing;
+}
+
+void refresh_missing_pages()
+{
+ int i;
+ for(i=0; i<nMachines; ++i) missing_pages[i] = 0;
+}
+
+void refresh_machine_status()
+{
+ int i;
+ for(i=0; i<nMachines; ++i) machine_status[i] = NOT_READY;
+}
+
+void mod_machine_status()
+{
+ /* take into account of machines which have received this file */
+ int i;
+ for(i=0; i<nMachines; ++i) {
+ if (file_received[i]==FILE_RECV) machine_status[i]=MACHINE_OK;
+ }
+}
+
+void refresh_file_received()
+{
+ int i;
+ for(i=0; i<nMachines; ++i) file_received[i] = NOT_RECV;
+}
+
+int nNotRecv()
+{
+ int i, c;
+ c = 0;
+ for(i=0; i<nMachines; ++i) {
+ if (file_received[i] == NOT_RECV) ++c;
+ }
+ return c;
+}
+
+int iNotRecv()
+{
+ /* find the first NotRecv machine */
+ int i;
+ for(i=0; i<nMachines; ++i) {
+ if (file_received[i] == NOT_RECV) return i;
+ }
+ return -1;
+}
+
+void init_machine_status(int n)
+{
+ int i;
+ nMachines = n;
+ machine_status = malloc(n * sizeof(char));
+ refresh_machine_status();
+
+ bad_machines = malloc(n * sizeof(char));
+ for(i=0; i<nMachines; ++i) {
+ bad_machines[i] = GOOD_MACHINE;
+ }
+
+ total_missing_page = malloc(n * sizeof(int));
+ for(i=0; i<nMachines; ++i) {
+ total_missing_page[i] = 0;
+ }
+
+ missing_pages = malloc(n * sizeof(int));
+ for(i=0; i<nMachines; ++i) {
+ missing_pages[i] = 0;
+ }
+
+ file_received = malloc(n * sizeof(char));
+ refresh_file_received();
+
+ skip_count = 0;
+
+ last_seq = malloc(n * sizeof(int));
+ for(i=0; i<nMachines; ++i) {
+ last_seq[i] = -1;
+ }
+}
+
+int get_total_missing_pages(int n)
+{
+ return total_missing_page[n];
+}
+
+int read_handle_complaint(int cmd)
+{
+ /*
+ cmd = cmd_code -- each cmd expects different 'response' (complaints)
+ return 1 for complaint handled
+ return 0 for irrelevant complaint
+ return -1 for time-out
+ */
+ int mid_v, code_v, file_v, npage_v, bytes_read;
+
+ if (readable(complaint_fd)) {
+ /* There is a complaint */
+ bytes_read = recvfrom(complaint_fd, flow_buff, FLOW_BUFFSIZE, 0, NULL, NULL);
+
+ /* 20060323 deal with big- vs little-endian issue
+ convert incoming integers into host representation */
+
+ mid_v = ntohl(*mid_ptr);
+
+ if ((bytes_read < FLOW_HEAD_SIZE) || (mid_v < 0 ) ||
+ (mid_v >= (unsigned int) nMachines) || /* boundary check for mid_v for safety */
+ (bad_machines[mid_v] == BAD_MACHINE)) { /* ignore complaint from a bad machine*/
+ return 0;
+ }
+
+ code_v = ntohl(*code_ptr);
+ file_v = ntohl(*file_ptr);
+ npage_v = ntohl(*npage_ptr);
+
+ /* check if the complaint is for the current file */
+ if (code_v != MONITOR_OK && file_v != current_entry()) return 0;
+ /* out of seq will be ignored */
+ if (code_v != MISSING_PAGE && code_v != MISSING_TOTAL) { /********* MISSING_TOTAL ? *************/
+ if (npage_v <= last_seq[mid_v]) return 0;
+ else last_seq[mid_v] = npage_v;
+ }
+
+ switch (code_v) {
+ case PAGE_RECV:
+ /******** check if machineID is the one we have set. */
+ /*if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);*/
+ if (cmd == SENDING_DATA && mid_v == monitorID)
+ return 1;
+ else
+ return 0;
+
+ case MONITOR_OK:
+ /********* check if machineID is the one we have set. */
+ if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);
+ if (cmd == SELECT_MONITOR_CMD && mid_v == monitorID)
+ return 1;
+ else
+ return 0;
+
+ case OPEN_OK :
+ if (cmd == OPEN_FILE_CMD) {
+ machine_status[mid_v] = MACHINE_OK;
+ return 1;
+ } else {
+ return 0;
+ }
+
+ case CLOSE_OK :
+ if (cmd == CLOSE_FILE_CMD || cmd == CLOSE_ABORT_CMD) {
+ machine_status[mid_v] = MACHINE_OK;
+ return 1;
+ } else {
+ return 0;
+ }
+
+ case EOF_OK :
+ if (cmd == EOF_CMD && file_received[mid_v]==NOT_RECV) {
+ machine_status[mid_v] = MACHINE_OK;
+ file_received[mid_v] = FILE_RECV;
+ return 1;
+ } else {
+ return 0;
+ }
+
+ case MISSING_PAGE :
+ if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0;
+ if (npage_v > nPages) return 0;
+ {
+ int i, *pi, page_v;
+ pi = pArray_ptr;
+ for (i = 0; i<npage_v; ++i) {
+ page_v = ntohl(pi[i]);
+ if (page_v<1 || page_v > nPages) continue; /*** make sure page_v starts with 1*/
+ missing_page_flag[page_v-1] = MISSING;
+ }
+ }
+ missing_pages[mid_v] += npage_v;
+ set_has_missing();
+ return 1;
+
+ case MISSING_TOTAL:
+ if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV || machine_status[mid_v] == MACHINE_OK)
+ return 0;
+ /* Consider to add: if npage_v >missing_pages[mid_v], ask to resend
+ [ likely no big gain ] */
+ total_missing_page[mid_v] += npage_v;
+ set_has_missing(); /* store the info about missing info */
+ machine_status[mid_v] = MACHINE_OK; /* machine_status serves as ack only */
+ return 1;
+
+ case SIT_OUT :
+ if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0;
+ fprintf(stderr, "*** %s sits-out-receiving %s\n",
+ id2name(mid_v), getFilename());
+ machine_status[mid_v] = MACHINE_OK;
+
+ if (!has_sick) ++skip_count;
+ set_has_sick();
+ return 1;
+
+ default :
+ if (verbose>=2) fprintf(stderr, "Unknown complaint: code = %d\n", code_v);
+ return 0;
+ } /* end of switch */
+ } /* end of if(readable) */
+
+ /* time out of readable() */
+ return -1;
+}
+
+int all_machine_ok()
+{
+ int i;
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE)
+ return FALSE; /* there is at least one machine that has not sent ack */
+ }
+ return TRUE; /* all machines are ready */
+}
+
+/* this is for the master to receive the acknowledgement. */
+void wait_for_ok(int code)
+{
+ int i, count, resp;
+ time_t tloc;
+ time_t rtime0, rtime1;
+
+ rtime0 = time(&tloc); /* reference time */
+
+ count = 0;
+ while (!all_machine_ok()) {
+ resp = read_handle_complaint(code);
+ if (resp==1) { /* if there is a complaint handled */
+ rtime0 = time(&tloc); /* reset the reference time */
+ continue;
+ }
+
+ if (resp==0) { /* irrelevant complaint received */
+ continue;
+ }
+
+ /* no complaints handled within the time period set by set_delay() */
+ rtime1 = time(&tloc); /* time since last complaints */
+ if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) {
+ ++count;
+ if (count < my_ACK_WAIT_TIMES) {
+ if (verbose>=1 && (count % 10 == 0))
+ fprintf(stderr, " %d: resend cmd(%s) to machines:[ ", count, cmd_name[code]);
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) {
+ if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "%d ", i);
+ send_cmd(code, (int) i);
+ usleep(FAST);
+ }
+ }
+ if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "]\n");
+ rtime0 = rtime1;
+ } else { /* allowable period of time has passed */
+ fprintf(stderr, " Drop these bad machines:[ ");
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) {
+ /* fprintf(stderr, "%d ", i); */
+ fprintf(stderr, "%s ", id2name(i));
+ bad_machines[i]= BAD_MACHINE;
+ /*
+ The pages sent by the master will be ignored by
+ the bad machine, because the current_file nubmer
+ does not match.
+ */
+ }
+ }
+ fprintf(stderr, "]\n");
+ break;
+ }
+ }
+ }
+}
+
+int is_it_missing(int page)
+{
+ return (missing_page_flag[page]==MISSING) ? TRUE : FALSE;
+}
+
+int pr_missing_pages()
+{
+ int i, N, exit_code=0;
+ off_t delta;
+ unsigned int dp;
+
+ for(i=0; i<nMachines; ++i) {
+ char name[PATH_MAX];
+ N = get_total_missing_pages(i);
+ strcpy(name, id2name(i));
+ if (strlen(name)==0) sprintf(name, "machine(%3d)", i);
+ fprintf(stderr, "%s: #_missing_page_request = %6.2f%% = %d\n",
+ name, (double)N/((double)total_pages)*100.0, N);
+ }
+
+ if (skip_count>0) {
+ fprintf(stderr, "\nWarning: There are %d files which are not delivered.\n", skip_count);
+ exit_code = -1;
+ }
+
+ fprintf(stderr, "\nTotal number of files = %12d Pages w/o ack = %12u (%6.2f%%)\n",
+ total_entries(), pages_wo_ack(), (double)pages_wo_ack()/(double)real_total_pages*100.0);
+
+ dp = real_total_pages - total_pages;
+ fprintf(stderr, "Total number of pages = %12d Pages re-sent = %12u (%6.2f%%)\n",
+ total_pages, dp, (double)dp/(double)total_pages*100.0);
+
+ delta = (off_t)(real_total_bytes - total_bytes);
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "Total number of bytes = %12llu Bytes re-sent = %12llu (%6.2f%%)\n",
+ total_bytes, delta, (double)delta/(double)total_bytes*100.0);
+ #else
+ fprintf(stderr, "Total number of bytes = %12d Bytes re-sent = %12u (%6.2f%%)\n",
+ total_bytes, delta, (double)delta/(double)total_bytes*100.0);
+ #endif
+
+ return (exit_code);
+}
+
+/* count the number of bad machines */
+int nBadMachines()
+{
+ int i, count = 0;
+ for(i=0; i<nMachines; ++i) {
+ if (bad_machines[i] == BAD_MACHINE) ++count;
+ }
+
+ return count;
+}
+
+int choose_print_machines(char *stArray, char selection, char * msg_prefix)
+{
+ int i, count = 0;
+
+ for(i=0; i<nMachines; ++i) {
+ char line[PATH_MAX];
+ if (stArray[i] == selection) {
+ ++count;
+ strcpy(line, id2name(i));
+ if (count == 1) { fprintf(stderr, msg_prefix); }
+ if (strlen(line)==0)
+ fprintf(stderr, "%d ", i);
+ else
+ fprintf(stderr, "%s ", line);
+ }
+ }
+ if (count > 0) fprintf(stderr, "]\n");
+ return count;
+}
+
+int send_done_and_pr_msgs(double total_time, double t_page)
+{
+ int exit_code1 =0;
+ int exit_code2 =0;
+ int exit_code3 =0;
+
+ send_all_done_cmd();
+
+ /* exit_code1 !=0 if there are files that were not delivered due to change or skipped */
+ exit_code1 = pr_missing_pages();
+
+ fprintf(stderr, "Total time spent = %6.2f (min) ~ %6.2f (min/GB)\n\n",
+ total_time, total_time / ((double)real_total_bytes/1.0e9));
+ fprintf(stderr, "Send pages time = %6.2f (min) ~ %6.2f (min/GB)\n\n",
+ t_page, t_page / ((double)real_total_bytes/1.0e9));
+
+ exit_code2 = choose_print_machines(bad_machines,
+ BAD_MACHINE,
+ "Not synced for bad machines:[ ");
+
+ if (quitWithOneBad && nBadMachines() >=1) {
+ fprintf(stderr, "We choose to exit when at least one target is bad\n");
+ fprintf(stderr, "All files following the current one did not get delivered\n");
+ fprintf(stderr, "If resend cmd(CLOSE_FILE), then the current file may have been delivered to non-bad targets\n\n");
+ }
+
+ if (current_entry() < total_entries()) { /* if we exit prematurely */
+ exit_code3 = choose_print_machines(machine_status,
+ NOT_READY, "\nNot-ready machines:[ ");
+ }
+
+ if (verbose>=1) pr_rtt_hist();
+ return (exit_code1+exit_code3); /* 200807 removed exit_code2 because bad machines case has been dealt with
+ by -q. If no -q, then the bad machines are considered 'harmless' */
+}
+
+/* to do some cleanup before exit IF all machines are bad */
+void do_badMachines_exit()
+{
+ if ((quitWithOneBad && nBadMachines() < 1) ||
+ (!quitWithOneBad && (nBadMachines() < nMachines))) return;
+
+ if (quitWithOneBad)
+ fprintf(stderr, "One (or more) machine is bad. Exit!\n");
+ else
+ fprintf(stderr, "All machines are bad. Exit!\n");
+
+ send_done_and_pr_msgs(-1.0, -1.0);
+ exit(-1);
+}
+
+void do_cntl_c(int signo)
+{
+ fprintf(stderr, "Control_C interrupt detected!\n");
+
+ send_done_and_pr_msgs(-1.0, -1.0);
+ exit(-1);
+}
diff --git a/src/file_operations.c b/src/file_operations.c
new file mode 100644
index 0000000..d9a8733
--- /dev/null
+++ b/src/file_operations.c
@@ -0,0 +1,800 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file contains all the file-operations for multicatcher.
+ 20060404:
+ found a undetected omission in open_file().
+ AFter lseek(), we should write a dummy byte so that
+ multicatcher.zzz has the right file size to start with.
+ Otherwise, it will grow as syncing progresses.
+
+ Port the code to deal with Large_files.
+ esp in write_page(),
+ lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET)
+ 200603:
+ Remove the meta-data operation.
+ Each file's info (stat) is transfered to targets during
+ the OPEN_FILE_CMD. extract_file_info() in this file
+ is to get that stat info for the current entry(file).
+
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Previously, memory mapped file was used for file IO
+ but later was changed to simple open() and lseek(), write().
+ This was also echoed in a patch by Clint Byrum <clint@careercast.com>.
+
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was originally called wish_list.c
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include <libgen.h>
+#include "main.h"
+
+extern int verbose;
+extern int machineID;
+
+char * baseDir = NULL;
+char * missingPages=NULL; /* starting address of the array of flags */
+int fout; /* file discriptor for output file */
+
+int toRmFirst = FALSE; /* remove existing file first and then sync */
+/* off_t total_bytes_written;*/
+unsigned int nPages;
+int had_done_zero_page;
+int current_file_id = 0; /* 1, 2, 3 ... or -1, -2 ... for backup */
+int backup; /* flag for a file that needs backup when deletion */
+char *backup_suffix = NULL;
+char my_backup_suffix[] = "_mcast_bakmmddhhmm";
+
+/* file stat_info which is transmitted from master */
+mode_t stat_mode;
+nlink_t stat_nlink;
+uid_t stat_uid;
+gid_t stat_gid;
+off_t stat_size;
+time_t stat_atime;
+time_t stat_mtime;
+
+char filename[PATH_MAX];
+char linktar[PATH_MAX];
+char fullpath[PATH_MAX];
+char tmp_suffix[L_tmpnam];
+
+void default_suffix()
+{
+ time_t t;
+ struct tm tm;
+ time(&t);
+ localtime_r(&t, &tm);
+ sprintf(my_backup_suffix, "_mcast_bak%02d%02d%02d%02d",
+ tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min);
+ backup_suffix = my_backup_suffix;
+ return;
+}
+
+void get_tmp_suffix()
+{
+ /* this is called once in each multicatcher */
+ char tmp[L_tmpnam];
+ tmpnam_r(&tmp[0]);
+ strcpy(tmp_suffix, basename(tmp));
+}
+
+int make_backup()
+{
+ char fnamebak[PATH_MAX];
+ if (strlen(fullpath) + strlen(backup_suffix) > (PATH_MAX-1)) {
+ fprintf(stderr, "backup filename too long\n");
+ return FAIL;
+ };
+
+ if (!backup) return SUCCESS; /* if not match with pattern, skip the backup */
+
+ sprintf(fnamebak, "%s%s", fullpath, backup_suffix);
+ /*
+ The backup scheme is as follows.
+ ln file file.bak (mv file file.bak would cause file to non-exist for a short while)
+ mv file.new file
+ */
+ if (link(fullpath, fnamebak) != 0) {
+ if (errno != ENOENT && errno != EINVAL) {
+ fprintf(stderr,"hardlink %s => %s : %s\n",fullpath, fnamebak, strerror(errno));
+ return FAIL;
+ }
+ }
+ if (verbose >= 2) {
+ fprintf(stderr, "backed up %s to %s\n",fullpath, fnamebak);
+ }
+ return SUCCESS;
+}
+
+void get_full_path(char * dest, char * sub_path)
+{
+ /* prepend the sub_path with baseDir --> dest */
+ strcpy(dest, baseDir);
+ strcat(dest, "/");
+ strcat(dest, sub_path);
+}
+
+void get_tmp_file(char * tmp)
+{
+ /*char *fncopy;*/
+ /* ******* change to filename_mcast.fileabc%$&? */
+ /* fncopy = strdup(filename); dirname change the string content */
+ strcpy(tmp, baseDir);
+ strcat(tmp, "/");
+ strcat(tmp, filename);
+ strcat(tmp, "_");
+ strcat(tmp, TMP_FILE);
+ strcat(tmp, tmp_suffix);
+ /* free(fncopy); */
+}
+
+int my_unlink(const char *fn)
+{
+ if (verbose>=2)
+ fprintf(stderr, "deleting file: %s\n", fn);
+ if (unlink(fn) != 0) {
+ if (errno==ENOENT) {
+ return SUCCESS;
+ } else {
+ /* NOTE: unlink() could not remove files which do not have w permission!*/
+ /* resort to shell command */
+ char cmd[PATH_MAX];
+ sprintf(cmd, "rm -f %s", fn);
+ if (system(cmd)!=0) {
+ fprintf(stderr, "'rm -f' fails for %s\n", fn);
+ return FAIL;
+ }
+ }
+ }
+ return SUCCESS;
+}
+
+int my_touch(const char*fn)
+{
+ int fo;
+ if( (fo = open(fn, O_RDWR | O_CREAT | O_TRUNC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) {
+ perror(fn);
+ return FAIL;
+ }
+
+ /* set the size of the output file */
+ if(lseek(fo, 0, SEEK_SET) == -1) {
+ fprintf(stderr, "cannot seek for %s\n", fn);
+ perror(fn);
+ return FAIL;
+ }
+ close(fo);
+ return SUCCESS;
+}
+
+int extract_file_info(char * buf, int n_file, unsigned int n_pages)
+{
+ /*
+ Along with OPEN_FILE_CMD, the data area in rec_buf contains
+ (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0
+ where the stat_string contains the buf in
+ sprintf(buf, "%lu %lu %lu %lu %lu %lu %lu", st.st_mode, st.st_nlink,
+ st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime);
+ */
+ char * pc = &buf[0];
+
+ #ifdef _LARGEFILE_SOURCE
+ if (sscanf(pc, "%u %u %u %u %llu %lu %lu %d", &stat_mode, &stat_nlink,
+ &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime,
+ &toRmFirst) != 8)
+ return FAIL;
+ #else
+ if (sscanf(pc, "%u %u %u %u %lu %lu %lu %d", &stat_mode, &stat_nlink,
+ &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime,
+ &toRmFirst) != 8)
+ return FAIL;
+ #endif
+
+ /* fprintf(stderr, "size= %llu\n", stat_size); *********/
+
+ pc += (strlen(pc) + 1);
+ strcpy(filename, pc);
+ get_full_path(fullpath, filename);
+
+ linktar[0] = '\0';
+ if (S_ISLNK(stat_mode) || stat_nlink > 1) { /* if it is a softlink or hardlink */
+ pc += (strlen(pc) +1);
+ strcpy(linktar, pc);
+ }
+
+ nPages = n_pages;
+ current_file_id = n_file;
+ backup = (current_file_id < 0);
+ had_done_zero_page = FAIL;
+ /*total_bytes_written = 0;*/
+ return SUCCESS;
+}
+
+int open_file()
+{
+ int i;
+
+ /*
+ sometimes for disk space reason, it is necessary
+ to first remove the file and sync.
+ If toReplace is true, the backup option should be off.
+ */
+ if (toRmFirst) {
+ if (!my_unlink(fullpath)) {
+ fprintf(stderr, "Replacing ");
+ perror(filename);
+ return FAIL;
+ }
+ }
+
+ /* fprintf(stderr, "%d %d %d\n", stat_mode, stat_nlink, stat_size); *********/
+ if (S_ISREG(stat_mode) && stat_nlink == 1) {
+ /* if it is a regular file and not a hardlink */
+ char tmpFile[PATH_MAX];
+ get_tmp_file(tmpFile);
+
+ my_unlink(tmpFile); /* make sure it's not there from previous runs */
+
+ if( (fout = open(tmpFile, O_RDWR | O_CREAT | O_TRUNC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) {
+ fprintf(stderr, "cannot open() %s for writing. \n", tmpFile);
+ perror(tmpFile);
+ return FAIL;
+ }
+
+ /* set the size of the output file (see Steve's book on page 411) */
+ if(lseek(fout, stat_size - 1 , SEEK_SET) == -1) {
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "lseek() error for %s with size = %llu\n", tmpFile, stat_size);
+ #else
+ fprintf(stderr, "lseek() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size);
+ #endif
+ perror(tmpFile);
+ close(fout);
+ return FAIL;
+ }
+ if (write(fout, "", 1) != 1) {
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "write() error for %s with size = %llu\n", tmpFile, stat_size);
+ #else
+ fprintf(stderr, "write() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size);
+ #endif
+ perror(tmpFile);
+ close(fout);
+ return FAIL;
+ }
+ }
+
+ /* init missingPages flags */
+ if (!missingPages) free(missingPages);
+ missingPages = malloc(sizeof(char) * nPages);
+ for(i=0; i < nPages; ++i) missingPages[i] = MISSING;
+
+ if (verbose>=2) fprintf(stderr, "Ready to receive file = %s\n", filename);
+ return SUCCESS;
+}
+
+int close_file()
+{
+ /* delete missingPages flags which was malloc-ed in open_file() */
+ free(missingPages);
+ missingPages = NULL;
+
+ if (S_ISREG(stat_mode) && stat_nlink == 1) {
+ /* if it is a regular file and not a hardlink */
+ char tmpFile[PATH_MAX];
+ struct stat stat;
+
+ if ((fout != -1) && (close(fout) != 0)) {
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n",
+ filename, stat_size);
+ #else
+ fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n",
+ filename, (unsigned int)stat_size);
+ #endif
+ perror("close");
+ return FAIL;
+ }
+ fout = -1; /* if the following fails, the reentry wont do munmap() */
+
+ /* the real work */
+ /* get_full_path(oldFile, filename); **** unnecessary fullpath is the oldFile */
+
+ get_tmp_file(tmpFile);
+
+ /* 8/14/2002
+ If there was a hardware (disk IO) problem
+ the sync should not proceed.
+ ** Add the following checking.
+ */
+ if (lstat(tmpFile, &stat)<0) {
+ perror("ERROR: close_file() cannot lstat the tmp file\n");
+ return FAIL;
+ }
+
+ if (backup && !make_backup(fullpath)) { /* make a hardlink oldFile => backup_file */
+ fprintf(stderr, "fail to make backup for %s\n", fullpath);
+ return FAIL;
+ }
+ if (rename(tmpFile, fullpath)<0) {
+ perror("ERROR: close_file():rename() \n");
+ return FAIL;
+ }
+
+ /* 20071016 in rare occasion, the written file has not the right size */
+ if (lstat(fullpath, &stat)<0) {
+ fprintf(stderr, "ERROR: close_file() cannot lstat %s\n", fullpath);
+ return FAIL;
+ }
+ if (stat_size != stat.st_size) {
+ fprintf(stderr, "ERROR: close_file() filesize != incoming-size\n");
+ return FAIL;
+ }
+ }
+
+ /* for debug
+ if (verbose>=2) fprintf(stderr, "total bytes written %llu for file %s\n",
+ total_bytes_written, filename);
+ */
+
+ return SUCCESS;
+}
+
+int rm_tmp_file()
+{
+ char tmpFile[PATH_MAX];
+
+ if ((fout != -1) && (close(fout) != 0)) {
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n",
+ filename, stat_size);
+ #else
+ fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n",
+ filename, (unsigned int)stat_size);
+ #endif
+ perror("rm_tmp");
+ return FAIL;
+ }
+ fout = -1;
+
+ get_tmp_file(tmpFile);
+
+ return (my_unlink(tmpFile));
+}
+
+int nPages_for_file()
+{
+ return nPages;
+};
+
+/* return total number of missing pages */
+int get_missing_pages()
+{
+ int i, result=0;
+
+ for(i=0; i < nPages; ++i)
+ if ((missingPages[i]) == MISSING) ++result;
+ return result;
+}
+
+int is_missing(int index)
+{
+ return (missingPages[index] == MISSING) ? TRUE : FALSE;
+}
+
+void page_received(int index)
+{
+ missingPages[index] = RECEIVED;
+}
+
+/*
+ write() in write_page() may block forever.
+ This function is to check if write() is ready.
+*/
+int writable(int fd)
+{
+ struct timeval write_tv;
+ fd_set wset;
+ FD_ZERO(&wset);
+ FD_SET(fd, &wset);
+
+ write_tv.tv_sec = WRITE_WAIT_SEC;
+ write_tv.tv_usec = WRITE_WAIT_USEC;
+ return (select(fd + 1, NULL, &wset, NULL, &write_tv)==1);
+}
+
+void write_page(int page, char *data_ptr, int bytes)
+{
+ /* page = page number starting with 1 */
+ if (page < 1 || page > nPages) return;
+
+ /* Do we need to write this page? */
+ if (is_missing(page-1)){
+ if (!writable(fout)) return;
+
+ /* Write the data */
+ if (lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET)<0) {
+ if (verbose>=1) {
+ fprintf(stderr, "ERROR: write_page():lseek() at page %d for %s\n",
+ page, filename);
+ perror("ERROR");
+ }
+ return;
+ }
+ if (write(fout, data_ptr, bytes)<0) {
+ /* write IO error !!! */
+ perror("ERROR");
+ fprintf(stderr, "write_page():write() error: at page %d for %s\n", page, filename);
+ return;
+ }
+
+ /* Mark the page as received in our wish list */
+ page_received(page-1);
+ /*total_bytes_written += bytes;*/
+ } else {
+ /* If we don't need to write it, just return */
+ if (verbose >=2) {
+ fprintf(stderr, "Already have page %d for %d:Ignoring\n", page, current_file_id);
+ }
+ }
+ return;
+}
+
+/* For files whose size is 0 */
+int touch_file()
+{
+ if (verbose >=2)
+ fprintf(stderr, "touching file: %s\n", fullpath);
+
+ my_unlink(fullpath);
+ return my_touch(fullpath);
+ /* system()
+ VERY time in-efficient
+ char cmd[PATH_MAX];
+ sprintf(cmd, "touch %s", fullpath);
+ return (system(cmd)==0);
+ */
+}
+
+int delete_file(int to_check_dir_type)
+{
+ struct stat st;
+ char fp[PATH_MAX];
+ int trailing_slash;
+ int type_checking;
+
+ strcpy(fp, fullpath);
+
+ /* remove trailing slash if any -- for deletion-'type' checking */
+ if (to_check_dir_type) {
+ char *pc;
+ type_checking = TRUE;
+ pc = &fp[0] + strlen(fp) - 1;
+ if (*pc=='/') {
+ *pc = '\0';
+ trailing_slash = TRUE;
+ } else {
+ trailing_slash = FALSE;
+ };
+ } else {
+ type_checking = FALSE;
+ }
+
+ if(lstat(fp, &st) < 0) {
+ /* already gone ? */
+ return SUCCESS;
+ }
+ if (S_ISREG(st.st_mode) || S_ISLNK(st.st_mode)) { /* delete a file or link */
+ if (verbose>=2)
+ fprintf(stderr, "deleting file: %s\n", fp);
+
+ if (type_checking && trailing_slash) { /* intended to remove a directory when it is not */
+ return FAIL;
+ }
+
+ if (backup && S_ISREG(st.st_mode) && !make_backup(fp)) {/* backup regular file */
+ return FAIL; /* failed to make_backup */
+ }
+ return (my_unlink(fp));
+ } else if (S_ISDIR(st.st_mode)) { /* remove a directory */
+ char cmd[PATH_MAX];
+ if (verbose>=2)
+ fprintf(stderr, "deleting directory: %s\n", fp);
+
+ if (type_checking && (!trailing_slash)) { /* intended to remove a non-dir when it is directory */
+ return FAIL;
+ }
+
+ sprintf(cmd, "rm -rf %s", fp); /* remove everything in dir, watch out for this */
+ return (system(cmd)==0);
+ }
+ /* not file, link, directory */
+ fprintf(stderr, "unrecognized file_mode for %s\n", fp);
+ return FAIL;
+}
+
+/* send complaints to the master for missing data */
+int ask_for_missing_page()
+{
+ int i, n=0, total=0;
+
+ /*
+ Send missing page indexes if any
+ */
+ init_fill_ptr();
+ for(i=0; i < nPages; ++i) {
+ if (missingPages[i] == MISSING ) {
+ ++n;
+ ++total;
+ if (n > MAX_NPAGE) {
+ /* send previous missing page-indexes */
+ send_complaint(MISSING_PAGE, machineID, MAX_NPAGE, current_file_id);
+ init_fill_ptr();
+ n = 1;
+ }
+ /* fill in one page index */
+ fill_in_int(i+1); /* origin = 1 */
+ }
+ }
+ /* send the rest of missing pages complaint */
+ if (n>0) send_complaint(MISSING_PAGE, machineID, n, current_file_id);
+
+ return total; /* there are missing pages */
+}
+
+void missing_page_stat()
+{
+ int i, n=0, sum=0, last=-1;
+
+ for(i=0; i < nPages; ++i) {
+ if (missingPages[i] == MISSING ) {
+ ++n;
+ if (last<0) {
+ sum += i;
+ } else {
+ sum += (i - last);
+ }
+ last = i;
+ }
+ }
+ if (n>0) {
+ double a = sum;
+ double b = n;
+ fprintf(stderr, "file= %d miss= %d out-of %d avg(delta_index) = %f\n",
+ current_file_id, n, nPages, a/b);
+ }
+}
+
+void my_perror(char * msg)
+{
+ char fn[PATH_MAX];
+ sprintf(fn, "%s - %s", fullpath, msg);
+ perror(fn);
+}
+
+int set_owner_perm_times()
+{
+ int state = SUCCESS;
+
+ /* set owner */
+ if (lchown(fullpath, stat_uid, stat_gid)!=0) {
+ my_perror("chown");
+ state = FAIL;
+ }
+
+ /*
+ set time and permission.
+ Don't try to set the time and permission on a link
+ */
+ if (!S_ISLNK(stat_mode)) {
+ struct utimbuf times;
+ if (chmod(fullpath, stat_mode)!=0) {
+ my_perror("chmod");
+ state = FAIL;
+ }
+
+ times.actime = stat_atime;
+ times.modtime = stat_mtime;
+ if (utime(fullpath, &times)!=0) {
+ my_perror("utime");
+ state = FAIL;
+ }
+ }
+ return state;
+}
+
+int update_directory()
+{
+ struct stat st;
+ int exists;
+ char fp[PATH_MAX], *pc;
+
+ if (verbose>=2)
+ fprintf(stderr, "Updating dir: %s\n", fullpath);
+
+ /* if fullpath is a softlink that points to a dir,
+ and it has a trailing '/',
+ lstat() will view it as a directory !
+ So, we remove the trailing '/' before lstat() */
+ strcpy(fp, fullpath);
+ pc = &fp[0] + strlen(fp) - 1;
+ if (*pc=='/') *pc = '\0';
+
+ if(lstat(fp, &st) < 0) {
+ switch(errno) {
+ case ENOENT:
+ exists = FALSE;
+ break;
+ default:
+ my_perror("lstat");
+ return FAIL;
+ }
+ } else {
+ exists = TRUE;
+ }
+
+ if (!exists) {
+ /* There's nothing there, so create dir */
+ if (mkdir(fp, stat_mode) < 0){
+ my_perror("mkdir");
+ return FAIL;
+ }
+ return SUCCESS;
+ } else if (!S_ISDIR(st.st_mode)) {
+ /* If not a directory delete what is there */
+ if (unlink(fp)!=0){
+ my_perror("unlink");
+ return FAIL;
+ }
+ if (verbose>=2)
+ fprintf(stderr, "Deleted file %s to replace with directory\n", fp);
+ if (mkdir(fp, stat_mode) < 0){
+ my_perror("mkdir");
+ return FAIL;
+ }
+ return SUCCESS;
+ } else {
+ /* If dir exists, just chmod */
+ chmod(fp, stat_mode);
+ /*** 20070410: changing mtime of a dir can cause NFS to confuse
+ See http://lists.samba.org/archive/rsync/2004-May/009439.html
+ So, I comment it out in the following
+
+ struct utimbuf times;
+ times.actime = stat_atime;
+ times.modtime = stat_mtime;
+ ***/
+ /*
+ if (utime(fullpath, &times) < 0) {
+ my_perror("utime");
+ }
+ */
+ return SUCCESS;
+ }
+}
+
+int update_directory0()
+{
+ DIR *d;
+
+ struct utimbuf times;
+ times.actime = stat_atime;
+ times.modtime = stat_mtime;
+
+ if (verbose>=2)
+ fprintf(stderr, "Creating dir: %s\n", fullpath);
+
+ d = opendir(fullpath);
+ if (d == NULL) {
+ switch(errno) {
+ case ENOTDIR:
+ /* If not a directory delete what is there */
+ if (unlink(fullpath)!=0){
+ my_perror("unlink");
+ return FAIL;
+ }
+ if (verbose>=2)
+ fprintf(stderr, "Deleted file %s to replace with directory\n", fullpath);
+ /* Fall through to ENOENT */
+ case ENOENT:
+ /* There's nothing there, so create dir */
+ if (mkdir(fullpath, stat_mode) < 0){
+ my_perror("mkdir");
+ return FAIL;
+ }
+ return SUCCESS;
+ default:
+ my_perror("opendir");
+ return FAIL;
+ }
+ } else {
+ /* If dir exists, just chmod */
+ closedir(d);
+ chmod(fullpath, stat_mode);
+ /*** 20070410: changing mtime of a dir can cause NFS to confuse
+ See http://lists.samba.org/archive/rsync/2004-May/009439.html
+ So, I comment it out in the following ***/
+ /*
+ if (utime(fullpath, &times) < 0) {
+ my_perror("utime");
+ }
+ */
+ return SUCCESS;
+ }
+}
+
+int check_zero_page_entry()
+{
+ /* when total_pages = 0, the entry can be
+ an empty file
+ softlink (hardlink),
+ a directory
+ */
+ if (had_done_zero_page) return SUCCESS; /* to avoid doing it again */
+
+ if (S_ISDIR(stat_mode)) { /* a directory */
+ if (!update_directory()) {
+ had_done_zero_page = FAIL;
+ return FAIL;
+ }
+ } else if (S_ISLNK(stat_mode)) { /* Is it a softlink? */
+ if (verbose>=2)
+ fprintf(stderr, "Making softlink: %s -> %s\n", fullpath, linktar);
+ delete_file(FALSE); /* remove the old one at fullpath */
+ if (symlink(linktar, fullpath) < 0) {
+ my_perror("symlink");
+ had_done_zero_page = FAIL;
+ return FAIL;
+ }
+ } else if (stat_nlink > 1) { /* hardlink */
+ char fn[PATH_MAX];
+ get_full_path(fn, linktar); /* linktar is a relative path from synclist */
+ if (verbose>=2)
+ fprintf(stderr, "Making a hardlink: %s => %s\n", fullpath, fn);
+ my_unlink(fullpath); /* remove the old one */
+ if (link(fn, fullpath)!=0) {
+ my_perror("link");
+ had_done_zero_page = FAIL;
+ return FAIL;
+ }
+ } else {
+ /* it must be a regular file */
+
+ if (!touch_file()) {
+ had_done_zero_page = FAIL;
+ return FAIL;
+ } else {
+ set_owner_perm_times();
+ }
+ }
+ had_done_zero_page = SUCCESS;
+ return SUCCESS;
+}
+
diff --git a/src/global.c b/src/global.c
new file mode 100644
index 0000000..64104eb
--- /dev/null
+++ b/src/global.c
@@ -0,0 +1,35 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+char *cmd_name[] = { "TIME-OUT",
+ "TEST",
+ "SEND_DATA",
+ "RESEND_DATA",
+ "OPEN_FILE",
+ "EOF",
+ "CLOSE_FILE",
+ "CLOSE_ABORT",
+ "ALL_DONE",
+ "SEL_MONITOR",
+ "NULL"};
+
+int verbose = 0; /* = 0 little detailed output, = 1,2 ... = n a lot more details */
+int machineID = -1; /* used for multicatcher */
+
diff --git a/src/id_map.c b/src/id_map.c
new file mode 100644
index 0000000..8b8dcaa
--- /dev/null
+++ b/src/id_map.c
@@ -0,0 +1,74 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <string.h>
+
+void strip(char * str);
+
+/* place to hold the array of string */
+char ** machine_names = NULL; /* array of (char*) */
+int nTargets=0;
+
+void get_machine_names(char * filename)
+{
+ FILE *fd;
+ char line[PATH_MAX];
+ int count=0;
+
+ if ((fd = fopen(filename, "r")) == NULL) {
+ fprintf(stderr, "Cannot open file -- %s \n", filename);
+ return;
+ }
+ while (fgets(line, PATH_MAX, fd) != NULL) {
+ strip(line);
+ if (strlen(line) != 0) ++count;
+ }
+ if (count == 0) {
+ fclose(fd);
+ fprintf(stderr, "No machine names in the file = %s\n", filename);
+ return;
+ }
+
+ nTargets = count;
+
+ rewind(fd);
+ machine_names = malloc(nTargets * sizeof(void*));
+
+ line[0] = '\0';
+ count = 0;
+ while(fgets(line, PATH_MAX, fd) != NULL) {
+ strip(line);
+ if (strlen(line)==0) continue;
+ machine_names[count] = (char*)strdup(line);
+ line[0] = '\0';
+ ++count;
+ }
+ fclose(fd);
+}
+
+char * id2name(int id)
+{
+ return (machine_names) ? machine_names[id] : "";
+}
diff --git a/src/main.h b/src/main.h
new file mode 100644
index 0000000..48f8c55
--- /dev/null
+++ b/src/main.h
@@ -0,0 +1,189 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Following the suggestion and the patch by Clint Byrum <clint@careercast.com>,
+ I added more control to selectively print out messages.
+ The control is done by the statement 'if (version >= n)'
+
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __main_h
+#define __main_h
+
+#include <dirent.h>
+#include <time.h>
+#include <utime.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
+#include <arpa/inet.h> /* inet(3) functions */
+#include <errno.h>
+#include <fcntl.h> /* for nonblocking */
+#include <sys/ioctl.h>
+#include <netdb.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/stat.h> /* for S_xxx file mode constants */
+#include <sys/uio.h> /* for iovec{} and readv/writev */
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/time.h> /* timeval{} for select() */
+#include <sys/times.h>
+#include <limits.h>
+
+#define VERSION "4.0.1"
+
+/* logic values */
+#define FALSE 0
+#define TRUE 1
+#define FAIL (FALSE)
+#define SUCCESS (TRUE)
+#define GOOD_EXIT 0
+#define BAD_EXIT -1
+
+/* Ports and addresses */
+#define PORT 8888 /* for multicast */
+#define FLOW_PORT (PORT-1) /* for flow-control */
+#define MCAST_ADDR "239.255.67.92"
+#define MCAST_TTL 1
+#define MCAST_LOOP FALSE
+#define MCAST_IF NULL
+
+/*
+ Handling socket's receive buffer on the target machine:
+ if the available data size in the receiveing buffer is larger
+ than TOO_MUCH then a TOO_FAST complaint is triggered.
+ The master will then sleep for USEC_TO_IDLE
+ Currently, this is not effective.
+*/
+#define TOO_FAST_LIMIT (TOTAL_REC_PAGE / 2) /* if half is full, then too fast */
+#define TOO_MUCH (TOO_FAST_LIMIT * PAGE_BUFFSIZE)
+#define USEC_TO_IDLE 1000000
+
+/* TIMING stuff */
+#define FAST 5000 /* usec */
+#define DT_PERPAGE 6000 /* usec */
+#define FACTOR 90 /* interpage interval = FACTOR * DT_PERPAGE or DT_PERPAGE*/
+#define SECS_FOR_KILL 30 /* time(sec) allowed for 'kill -9 pid' to finish */
+
+/* time for the master to wait for the acknowledgement */
+#define ACK_WAIT_PERIOD 1 /* secs (from time()); */
+#define ACK_WAIT_TIMES 60 /* wait for this many periods */
+
+#define SICK_RATIO (0.9)
+#define SICK_THRESHOLD (50) /* SICK FOR such many TIMES is really sick */
+
+/* max wait time for write() a page of PAGE_SIZE -- 100 msec */
+#define WRITE_WAIT_SEC 0
+#define WRITE_WAIT_USEC 100000
+
+#define SET_MON_WAIT_TIMES 6000 /* time = this number * FAST */
+#define NO_FEEDBACK_COUNT_MAX 10
+#define SWITCH_THRESHOLD 50 /* to avoid switching monitor too frequently
+ because of small diff in missing_pages */
+
+/* complaints */
+#define TOO_FAST 100
+#define OPEN_OK 200
+#define CLOSE_OK 300
+#define MISSING_PAGE 400
+#define MISSING_TOTAL 500
+#define EOF_OK 600
+#define SIT_OUT 700
+#define PAGE_RECV 800
+#define MONITOR_OK 900
+
+/* Sizes */
+/* 20060427: removed size_t which is arch-dependent */
+#define PAGE_SIZE 64512
+#define HEAD_SIZE (sizeof(int) + 2 * sizeof(int) + 2 * sizeof(int))
+#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE)
+#define TOTAL_REC_PAGE 20 /* change to 4 in case hit the OS limit in buf size */
+
+#define FLOW_HEAD_SIZE (sizeof(int)*4)
+#define FLOW_BUFFSIZE (PAGE_SIZE+FLOW_HEAD_SIZE)
+#define MAX_NPAGE (PAGE_SIZE / sizeof(int))
+/*
+ Modes and command codes:
+ The numerical codes are also the index to retrieve the command names
+ for printing in complaints.c
+*/
+#define TIMED_OUT 0
+#define TEST 1
+#define SENDING_DATA 2
+#define RESENDING_DATA 3
+#define OPEN_FILE_CMD 4
+#define EOF_CMD 5
+#define CLOSE_FILE_CMD 6
+#define CLOSE_ABORT_CMD 7
+#define ALL_DONE_CMD 8
+#define SELECT_MONITOR_CMD 9
+#define NULL_CMD 10
+
+/* machine status ----- for caster */
+#define MACHINE_OK_MISSING_PAGES '\2'
+#define MACHINE_OK '\1'
+#define NOT_READY '\0'
+
+#define BAD_MACHINE '\1'
+#define GOOD_MACHINE '\0'
+
+#define FILE_RECV '\1'
+#define NOT_RECV '\0'
+
+/* representation of all-targets for sends */
+#define ALL_MACHINES -1
+
+/* PAGE STATUS */
+#define MISSING '\0'
+#define RECEIVED '\1'
+
+/* MACHINE STATE ----- for catcher */
+#define IDLE_STATE 0
+#define GET_DATA_STATE 1
+#define DATA_READY_STATE 2
+#define SICK_STATE 3
+
+/*
+ The following two are info to be packed into
+ meta data to represent either file or directory deletion.
+*/
+/* SPECIAL # of PAGES to signal deleting action */
+#define TO_DELETE (-1)
+
+/* temporary file name prefix for transfering to */
+#define TMP_FILE "mrsync."
+
+#include "proto.h"
+
+#endif
diff --git a/src/main.h.in b/src/main.h.in
new file mode 100644
index 0000000..600c491
--- /dev/null
+++ b/src/main.h.in
@@ -0,0 +1,189 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Following the suggestion and the patch by Clint Byrum <clint@careercast.com>,
+ I added more control to selectively print out messages.
+ The control is done by the statement 'if (version >= n)'
+
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __main_h
+#define __main_h
+
+#include <dirent.h>
+#include <time.h>
+#include <utime.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
+#include <arpa/inet.h> /* inet(3) functions */
+#include <errno.h>
+#include <fcntl.h> /* for nonblocking */
+#include <sys/ioctl.h>
+#include <netdb.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/stat.h> /* for S_xxx file mode constants */
+#include <sys/uio.h> /* for iovec{} and readv/writev */
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/time.h> /* timeval{} for select() */
+#include <sys/times.h>
+#include <limits.h>
+
+#define VERSION "@VERSION@"
+
+/* logic values */
+#define FALSE 0
+#define TRUE 1
+#define FAIL (FALSE)
+#define SUCCESS (TRUE)
+#define GOOD_EXIT 0
+#define BAD_EXIT -1
+
+/* Ports and addresses */
+#define PORT 8888 /* for multicast */
+#define FLOW_PORT (PORT-1) /* for flow-control */
+#define MCAST_ADDR "239.255.67.92"
+#define MCAST_TTL 1
+#define MCAST_LOOP FALSE
+#define MCAST_IF NULL
+
+/*
+ Handling socket's receive buffer on the target machine:
+ if the available data size in the receiveing buffer is larger
+ than TOO_MUCH then a TOO_FAST complaint is triggered.
+ The master will then sleep for USEC_TO_IDLE
+ Currently, this is not effective.
+*/
+#define TOO_FAST_LIMIT (TOTAL_REC_PAGE / 2) /* if half is full, then too fast */
+#define TOO_MUCH (TOO_FAST_LIMIT * PAGE_BUFFSIZE)
+#define USEC_TO_IDLE 1000000
+
+/* TIMING stuff */
+#define FAST 5000 /* usec */
+#define DT_PERPAGE 6000 /* usec */
+#define FACTOR 90 /* interpage interval = FACTOR * DT_PERPAGE or DT_PERPAGE*/
+#define SECS_FOR_KILL 30 /* time(sec) allowed for 'kill -9 pid' to finish */
+
+/* time for the master to wait for the acknowledgement */
+#define ACK_WAIT_PERIOD 1 /* secs (from time()); */
+#define ACK_WAIT_TIMES 60 /* wait for this many periods */
+
+#define SICK_RATIO (0.9)
+#define SICK_THRESHOLD (50) /* SICK FOR such many TIMES is really sick */
+
+/* max wait time for write() a page of PAGE_SIZE -- 100 msec */
+#define WRITE_WAIT_SEC 0
+#define WRITE_WAIT_USEC 100000
+
+#define SET_MON_WAIT_TIMES 6000 /* time = this number * FAST */
+#define NO_FEEDBACK_COUNT_MAX 10
+#define SWITCH_THRESHOLD 50 /* to avoid switching monitor too frequently
+ because of small diff in missing_pages */
+
+/* complaints */
+#define TOO_FAST 100
+#define OPEN_OK 200
+#define CLOSE_OK 300
+#define MISSING_PAGE 400
+#define MISSING_TOTAL 500
+#define EOF_OK 600
+#define SIT_OUT 700
+#define PAGE_RECV 800
+#define MONITOR_OK 900
+
+/* Sizes */
+/* 20060427: removed size_t which is arch-dependent */
+#define PAGE_SIZE 64512
+#define HEAD_SIZE (sizeof(int) + 2 * sizeof(int) + 2 * sizeof(int))
+#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE)
+#define TOTAL_REC_PAGE 20 /* change to 4 in case hit the OS limit in buf size */
+
+#define FLOW_HEAD_SIZE (sizeof(int)*4)
+#define FLOW_BUFFSIZE (PAGE_SIZE+FLOW_HEAD_SIZE)
+#define MAX_NPAGE (PAGE_SIZE / sizeof(int))
+/*
+ Modes and command codes:
+ The numerical codes are also the index to retrieve the command names
+ for printing in complaints.c
+*/
+#define TIMED_OUT 0
+#define TEST 1
+#define SENDING_DATA 2
+#define RESENDING_DATA 3
+#define OPEN_FILE_CMD 4
+#define EOF_CMD 5
+#define CLOSE_FILE_CMD 6
+#define CLOSE_ABORT_CMD 7
+#define ALL_DONE_CMD 8
+#define SELECT_MONITOR_CMD 9
+#define NULL_CMD 10
+
+/* machine status ----- for caster */
+#define MACHINE_OK_MISSING_PAGES '\2'
+#define MACHINE_OK '\1'
+#define NOT_READY '\0'
+
+#define BAD_MACHINE '\1'
+#define GOOD_MACHINE '\0'
+
+#define FILE_RECV '\1'
+#define NOT_RECV '\0'
+
+/* representation of all-targets for sends */
+#define ALL_MACHINES -1
+
+/* PAGE STATUS */
+#define MISSING '\0'
+#define RECEIVED '\1'
+
+/* MACHINE STATE ----- for catcher */
+#define IDLE_STATE 0
+#define GET_DATA_STATE 1
+#define DATA_READY_STATE 2
+#define SICK_STATE 3
+
+/*
+ The following two are info to be packed into
+ meta data to represent either file or directory deletion.
+*/
+/* SPECIAL # of PAGES to signal deleting action */
+#define TO_DELETE (-1)
+
+/* temporary file name prefix for transfering to */
+#define TMP_FILE "mrsync."
+
+#include "proto.h"
+
+#endif
diff --git a/src/multicaster.c b/src/multicaster.c
new file mode 100644
index 0000000..3b26cae
--- /dev/null
+++ b/src/multicaster.c
@@ -0,0 +1,582 @@
+/*
+ Copyright (C) 2006-2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ version 3.0 major update
+ -- large file support
+ -- platform independence (between linux, unix)
+ -- backup feature (as in rsync)
+ -- removing meta-file-info
+ -- catching slow machine as the feedback monitor
+ -- mcast options
+ version 3.0.[1-9] bug fixes
+ -- logic flaw which under certain condition
+ caused premature dropout due to
+ unsuccessful EOF, CLOSE_FILE
+ and caused unwanranted SIT-OUT cases.
+ -- tested on Debian 64 bit arch by Nicolas Marot in France
+ version 3.1.0
+ -- codes for IPv6 are ready (but not tested)
+ IPv4 is tested ok.
+ version 3.2.0
+ -- monitor change improvement
+ -- handshake improvement (e.g. seq #)
+ -- if one machine skips a file, all will NOT close()
+ version 4.0 major update
+ -- consolidate sending missing pages in complaint flow
+ cutting the messages by one order magnitude
+ -- exit code adjustment
+
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <stdlib.h>
+#include <limits.h> /* to define PATH_MAX */
+#include <sys/times.h>
+#include <setjmp.h>
+#include <signal.h>
+
+extern int verbose;
+extern char * machine_list_file; /* defined in complaints.c */
+extern char * bad_machines; /* array of size nMachines, defined in complaints.c */
+extern char * file_received;
+extern int * missing_pages;
+extern int backup;
+extern int nPattern;
+extern char * pattern_baseDir;
+extern int nTargets;
+extern int my_ACK_WAIT_TIMES;
+extern int toRmFirst;
+extern int quitWithOneBad;
+extern int skip_count;
+extern int file_changed;
+
+char * my_MCAST_ADDR = MCAST_ADDR;
+int my_FLOW_PORT = FLOW_PORT;
+int my_PORT = PORT;
+int my_TTL = MCAST_TTL;
+int my_LOOP = MCAST_LOOP;
+char * my_IFname = MCAST_IF;
+
+int monitorID = -1;
+
+int no_feedback_count;
+
+void usage()
+{
+ fprintf(stderr,
+ "multicaster (to copy files to many multicatchers) version %s)\n"
+ " Option list:\n"
+ " [ -v <verbose_level 0-2> ]\n"
+ " [ -w <ack_wait_times default= %d (secs) ]\n"
+ " [ -X toRmFirst_flag default= cp2tmp_and_mv ]\n"
+ " [ -q quit_with_1_bad defalut= quit_with_all_bad ]\n"
+ " -------- essential options ----------------------------------\n"
+ " -m <machine_list_filePath>\n"
+ " -s <data_source_path>\n"
+ " -f <synclist_filePath>\n"
+ " -------- options for backup ---------------------------------\n"
+ " [ -b flag to turn on backup ]\n"
+ " [ -r <filepath> for regex patterns for files needing backup ]\n"
+ " [ -d <basedir> for regex patterns ]\n"
+ " -------- mcast options --------------------------------------\n"
+ " [ -A <my_mcast_address default=%s> **same as for multicatcher ]\n"
+ " [ -P <my_PORT default=%d> **same as for multicatcher ]\n"
+ " [ -T <my_TTL default=%d> ]\n"
+ " [ -L flag turn on mcast_LOOP. default is off ]\n"
+ " [ -I <my_MCAST_IF default=NULL> ]\n",
+ VERSION, my_ACK_WAIT_TIMES, MCAST_ADDR, PORT, my_TTL);
+}
+
+
+int monitor_cmd(int cmd, int machineID)
+{
+ /*
+ Once this fx is called the old monitor will be
+ turned off. So, we need to make sure this fx
+ returns TRUE for a machine. Or the monitor_set_up
+ should fail.
+ */
+ int count =0, resp;
+ set_delay(0, FAST);
+ send_cmd(cmd, machineID);
+ while (1) { /* wait for ack */
+ resp = read_handle_complaint(cmd);
+ if (resp<0) { /* time-out */
+ ++count;
+ send_cmd(cmd, machineID);
+ if (count > SET_MON_WAIT_TIMES) {
+ return FALSE;
+ }
+ } else if (resp==1) { /* got ack */
+ return TRUE;
+ }
+ /* irrevelant resp -- continue */
+ }
+}
+
+int find_max_missing_machine(char *flags)
+{
+ /* flags[] -> which machines are not considered */
+ int i, index = -1;
+ int max = -1;
+ int threshold;
+
+ for(i=0; i<nTargets; ++i) {
+ int missing;
+ /** missing = total_missing_page[i]; **/
+ missing = missing_pages[i]; /* for last file (OPEN) or this file (other cmds)*/
+ if (missing > max && flags[i] == GOOD_MACHINE) {
+ max = missing;
+ index = i;
+ }
+ }
+
+ threshold = (get_nPages() > SWITCH_THRESHOLD*100) ?
+ get_nPages() / 100 : SWITCH_THRESHOLD;
+ if (index>=0) {
+ if (monitorID>=0) {
+ if (flags[monitorID]==BAD_MACHINE) return index;
+ /** if (max <= (total_missing_page[monitorID] + threshold)) **/
+ if (max <= (missing_pages[monitorID] + threshold))
+ return monitorID;
+ }
+ return index;
+ } else { /* could come here if all are busy*/
+ return -1;
+ }
+
+ /**
+ return ((index >= 0) &&
+ (monitorID >= 0) &&
+ (bad_machines[monitorID] == GOOD_MACHINE) &&
+ (max <= (total_missing_page[monitorID] + threshold))) ?
+ monitorID : index;
+ **/
+}
+
+void set_monitor(int mid)
+{
+ /* one machine is to be set. So need to succeed. */
+ if (monitor_cmd(SELECT_MONITOR_CMD, mid)) {
+ if (verbose >=1) fprintf(stderr, "Monitor - %s\n", id2name(mid));
+ return;
+ } else {
+ fprintf(stderr, "Fatal: monitor %s cannot be set up!\n", id2name(mid));
+ send_done_and_pr_msgs(-1.0, -1.0);
+ exit(BAD_EXIT);
+ }
+}
+
+void check_change_monitor(int undesired_index)
+{
+ /* this function changes the value of the global var: monitorID */
+ int i, count;
+ char * flags;
+
+ /* if all targets received the file, no need to go on */
+ if ((count=nNotRecv())==0) return;
+
+ if (count==1) {
+ i = iNotRecv();
+ if (bad_machines[i] == GOOD_MACHINE) {
+ monitorID = i;
+ /*
+ 'i' could be the current monitor.
+ We'd like to set it because there might
+ be something wrong with it if we come to
+ to this point.
+ */
+ set_monitor(monitorID);
+ }
+ return;
+ }
+
+ /* more than two machines do not receive the file yet */
+ /* flags mark those machines as BAD which we don't want to consider */
+ flags = malloc(nTargets * sizeof(char));
+ for(i=0; i<nTargets; ++i) {
+ flags[i] = (i == undesired_index || file_received[i] == FILE_RECV) ?
+ BAD_MACHINE : bad_machines[i];
+ }
+
+ /* check if all machines are not to be considered */
+ count = 0;
+ for(i=0; i<nTargets; ++i) {
+ if (flags[i]==BAD_MACHINE) ++count;
+ }
+ if (count==nTargets) {
+ /* Two ways to get here are
+ (1) during do_one_page:
+ prev_monitor = (not_recv) and other not_recv's are bad_machines.
+ (2) during after-ack:
+ prev_monitor = (recv) and other not_recv's are bad_machines.
+ */
+ free(flags);
+ set_monitor(monitorID);
+ return;
+ }
+
+ /* at least one not_recv (and good) machine is to be considered */
+ count = 0;
+ while (count < nTargets) {
+ i = find_max_missing_machine(flags);
+ /* if (i==monitorID) break;*/
+ monitorID = i;
+
+ if (monitorID < 0) break;
+
+ if (monitor_cmd(SELECT_MONITOR_CMD, monitorID)) {
+ if (verbose >=1) fprintf(stderr, "Monitor = %s\n", id2name(monitorID));
+ break;
+ } else {
+ flags[monitorID] = BAD_MACHINE;
+ /* Then, we attemp to set up other machine */
+ }
+ ++count;
+ }
+
+ free(flags);
+ if (monitorID < 0) {
+ fprintf(stderr, "Fatal: monitor machine cannot be set up!\n");
+ send_done_and_pr_msgs(-1.0, -1.0);
+ exit(BAD_EXIT);
+ }
+}
+
+void do_one_page(int page)
+{
+ int resp;
+ unsigned long rtt;
+ refresh_timer();
+ start_timer();
+ if (!send_page(page)) return;
+
+ /* first ignore all irrelevant resp */
+ resp=read_handle_complaint(SENDING_DATA);
+ while (resp==0) {
+ resp=read_handle_complaint(SENDING_DATA);
+ }
+
+ /* read_handle_complaint() waits n*interpage_interval at most */
+ if (resp==-1) {
+ /* delay_sec for readable() is set by set_delay() */
+ /******
+ at this point, the readable() returns without getting a reply
+ from monitorID after FACTOR*DT_PERPAGE (or DT_PERPAGE if without_monitor)
+ ****/
+ ++no_feedback_count;
+ if (verbose>=2) printf("no reply, count = %d\n", no_feedback_count);
+ update_rtt_hist(999999);
+ /* register this page as rtt = infinite --- the last element in rtt_hist */
+
+ if (no_feedback_count > NO_FEEDBACK_COUNT_MAX) {
+ /* switch to another client */
+ if (verbose >=2)
+ fprintf(stderr,
+ "Consecutive non_feedback exceeds limit, Changing monitor machine.\n");
+ /* if (nTargets>1 && (nTargets - nBadMachines()) > 1 && nNotRecv() > 1) */
+ check_change_monitor(monitorID); /* replace the current monitor */
+ no_feedback_count = 0;
+ }
+ return;
+ } else { /* resp == 1 */
+ end_timer();
+ update_time_accumulator();
+ rtt = get_accumulated_usec();
+ /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */
+ /* to do: update histogram */
+ if (verbose >=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt);
+ update_rtt_hist(rtt);
+
+ no_feedback_count = 0;
+ }
+}
+
+void send_cmd_and_wait_ack(int cmd_code)
+{
+ send_cmd(cmd_code, (int) ALL_MACHINES);
+ refresh_machine_status();
+ /*set_delay(0, FAST);*/
+ set_delay(0, DT_PERPAGE*FACTOR);
+ if (cmd_code==EOF_CMD) mod_machine_status();
+ wait_for_ok(cmd_code);
+ do_badMachines_exit();
+ /* check_change_monitor(-1); */
+}
+
+int do_file_changed_skip()
+{
+ /* if file is changed during syncing, then we should skip this file */
+ if (file_changed || !same_stat_for_file()) {
+ fprintf(stderr, "WARNING: file is changed during sycing -- skipping\n");
+ send_cmd_and_wait_ack(CLOSE_ABORT_CMD);
+ free_missing_page_flag();
+ ++skip_count;
+ return TRUE;
+ }
+ return FALSE;
+}
+
+int main(int argc, char *argv[])
+{
+ int c;
+ int cfile, ctotal_pages, cpage;
+ char * source_path = NULL;
+ char * synclist_path = NULL;
+ char * machine_list_file = NULL;
+ time_t tloc;
+ time_t time0, time1, t_page0, t_page;
+
+ while ((c = getopt(argc, argv, "v:w:A:P:T:LI:m:s:f:br:d:Xq")) != EOF) {
+ switch (c) {
+ case 'v':
+ verbose = atoi(optarg);
+ break;
+ case 'w':
+ my_ACK_WAIT_TIMES = atoi(optarg);
+ break;
+ case 'A':
+ my_MCAST_ADDR = optarg;
+ break;
+ case 'P':
+ my_PORT = atoi(optarg);
+ my_FLOW_PORT = my_PORT -1;
+ break;
+ case 'T':
+ my_TTL = atoi(optarg);
+ break;
+ case 'L':
+ my_LOOP = TRUE;
+ break;
+ case 'I':
+ my_IFname = optarg;
+ break;
+ case 'm':
+ machine_list_file = optarg;
+ break;
+ case 's':
+ source_path = optarg;
+ break;
+ case 'f':
+ synclist_path = optarg;
+ break;
+ case 'b':
+ backup = TRUE; /* if nPattern==0, backup means back up ALL files */
+ break;
+ case 'r': /* to selectively back up certain files as defined in the pattern */
+ if (!read_backup_pattern(optarg)) {
+ fprintf(stderr, "Failed in loading regex patterns in file = %s\n", optarg);
+ exit(BAD_EXIT);
+ }
+ break;
+ case 'd':
+ pattern_baseDir = strdup(optarg);
+ if (pattern_baseDir[strlen(pattern_baseDir)-1]=='/')
+ pattern_baseDir[strlen(pattern_baseDir)-1] = '\0' ; /* remove last / */
+ break;
+ case 'X':
+ toRmFirst = TRUE;
+ break;
+ case 'q':
+ quitWithOneBad = TRUE;
+ break;
+ case '?':
+ usage();
+ exit(BAD_EXIT);
+ }
+ }
+
+ if (!machine_list_file || !source_path || !synclist_path ) {
+ fprintf(stderr, "Essential options (-m -s -f) should be specified. \n");
+ usage();
+ exit(BAD_EXIT);
+ }
+
+ if (nPattern>0) backup = TRUE;
+ if (backup && nPattern>0) {
+ if (!pattern_baseDir) pattern_baseDir = strdup(source_path);
+ if (strlen(source_path) < strlen(pattern_baseDir) ||
+ strncmp(source_path, pattern_baseDir, strlen(pattern_baseDir))!=0) {
+ fprintf(stderr,
+ "src_path (%s) should include (and be longer than) pattern_baseDir (%s)",
+ source_path, pattern_baseDir);
+ exit(BAD_EXIT);
+ }
+ }
+
+ if (backup && toRmFirst) {
+ fprintf(stderr, "-B and -X cannot co-exist\n");
+ exit(BAD_EXIT);
+ }
+
+ get_machine_names(machine_list_file);
+ if (nTargets==0) {
+ fprintf(stderr, "No target to sync to\n");
+ exit(GOOD_EXIT);
+ }
+
+ if (!init_synclist(synclist_path, source_path)) exit(BAD_EXIT);
+
+ if (total_entries()==0) {
+ fprintf(stderr, "Nothing to sync in %s\n", synclist_path);
+ exit(GOOD_EXIT);
+ }
+
+ if (verbose >= 2)
+ fprintf(stderr, "Total number of files: %d\n", total_entries());
+
+ /* init the network stuff and some flags */
+ init_sends();
+ init_complaints();
+ init_machine_status(nTargets);
+
+ /* set up Cntl_C catcher */
+ Signal(SIGINT, do_cntl_c);
+
+ /* ------------------- set up monitor machine for doing feedback for each page sent */
+ check_change_monitor(-1);
+
+ /*-------------------------------------------------------------------------------*/
+
+ init_rtt_hist();
+ time0 = time(&tloc); /* start time */
+ t_page = 0; /* total time for sending pages */
+
+ /* -----------------------------Send the file one by one -----------------------------------*/
+ for (cfile = 1; cfile <= total_entries(); cfile++) { /* for each file to be synced */
+ if (!get_next_entry(cfile)) continue;
+
+ ctotal_pages = pages_for_file();
+
+ /*
+ By the time this file, which was obtained when synclist was
+ established some time ago, may no longer exist on the master.
+ So, we need to check the existence of this file.
+ fexist() also opens the file so that it won't be deleted
+ between here and the send-page-loop.
+ */
+ if (ctotal_pages > 0 && (!same_stat_for_file() ||
+ !fexist(current_entry()))) {
+ /* go to next file if this file has changed or does not exist */
+ fprintf(stderr, "%s (%d out of %d; Extinct file)\n",
+ getFilename(), current_entry(), total_entries());
+ adjust_totals();
+ continue;
+ }
+
+ if (ctotal_pages < 0) {
+ fprintf(stderr, "%s (%d out of %d; to delete)\n",
+ getFilename(), current_entry(), total_entries());
+ } else {
+ fprintf(stderr, "%s (%d out of %d; %d pages)\n",
+ getFilename(), current_entry(), total_entries(), ctotal_pages);
+ }
+
+ /* send_open_cmd */
+ pack_open_file_info();
+ send_cmd_and_wait_ack(OPEN_FILE_CMD);
+
+ /*
+ ctotal_pages < 0, for deletion
+ ctotal_pages = 0, regular file with no content.
+ or directory, softlink, hardlink
+ both should have been finished with OPEN_FILE_CMD
+ */
+ if (ctotal_pages <= 0) continue;
+
+ /* for other regular files */
+ init_missing_page_flag(ctotal_pages);
+ refresh_missing_pages(); /* total missing pages for this file for each tar */
+
+ /* ----- sending file data ----- first round */
+ t_page0 = time(&tloc);
+ no_feedback_count = 0;
+ for (cpage = 1; cpage <= ctotal_pages; cpage++) {
+ /*
+ the mode field and delay may be changed by change_monitor
+ */
+ set_delay(0, DT_PERPAGE*FACTOR);
+ set_mode(SENDING_DATA);
+ do_one_page(cpage);
+ }
+
+ if (do_file_changed_skip()) continue;
+
+ /* send "I am done with the first round" */
+ reset_has_missing();
+ refresh_file_received(); /* to record machines that have received this file */
+ send_cmd_and_wait_ack(EOF_CMD);
+
+ /* after the first run, before we go to 2nd and 3rd run, */
+ if (has_missing_pages()) check_change_monitor(-1);
+
+ /* ----- sending file data again, 2nd and 3rd and ...n-th round */
+ reset_has_sick();
+ while (has_missing_pages()) {
+ int c; /****************/
+ no_feedback_count = 0;
+
+ c = 0;
+ for (cpage = 1; cpage <= ctotal_pages; cpage++){
+ if (is_it_missing(cpage-1)) {
+ set_delay(0, DT_PERPAGE*FACTOR);
+ set_mode(RESENDING_DATA);
+ do_one_page(cpage);
+ page_sent(cpage-1);
+ ++c; /*************/
+ }
+ }
+ if (verbose>=1)
+ fprintf(stderr, "re-sent N_pages = %d\n", c); /*************/
+
+ /* eof */
+ reset_has_missing();
+ send_cmd_and_wait_ack(EOF_CMD);
+ if (has_sick_machines()) {
+ break;
+ /* one machine can reach sick_state while some others are still
+ in missing_page state.
+ This break here is ok in terms of skipping this file.*/
+ } else {
+ check_change_monitor(-1);
+ }
+ };
+
+ t_page += (time(&tloc) - t_page0);;
+ if (do_file_changed_skip()) continue;
+
+ /* close file */
+ send_cmd_and_wait_ack((has_sick_machines()) ? CLOSE_ABORT_CMD : CLOSE_FILE_CMD);
+ if (has_sick_machines()) {
+ fprintf(stderr, "Skip_syncing %s\n", getFilename());
+ }
+ free_missing_page_flag();
+ } /* end of the for each_file loop */
+
+ time1= time(&tloc);
+ return send_done_and_pr_msgs( ((double)(time1 - time0))/ 60.0, ((double)t_page)/60.0);
+}
+
diff --git a/src/multicatcher.c b/src/multicatcher.c
new file mode 100644
index 0000000..76fbc5e
--- /dev/null
+++ b/src/multicatcher.c
@@ -0,0 +1,181 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ verision 3.0 major update
+ -- large file support
+ -- platform independence (between linux, unix)
+ -- backup feature (as in rsync)
+ -- removing meta-file-info
+ -- catching slow machine as the feedback monitor
+ -- mcast options
+ version 3.0.[1-9] bug fixes
+ -- logic flaw which under certain condition
+ caused premature dropout due to
+ unsuccessful EOF, CLOSE_FILE
+ and caused unwanranted SIT-OUT cases.
+ -- tested on Debian 64 bit arch by Nicolas Marot in France
+ version 3.1.0
+ -- codes for IPv6 are ready (but not tested)
+ IPv4 is tested ok.
+ version 3.2.0
+ -- monitor change improvement
+ -- handshake improvement (e.g. seq #)
+ -- if one machine skips a file, all will NOT close()
+
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+
+extern int machineID;
+extern int verbose;
+extern int isMonitor;
+extern char * baseDir;
+extern char * cmd_name[];
+extern char * backup_suffix;
+
+char * my_MCAST_ADDR = MCAST_ADDR;
+char * my_IFname = MCAST_IF;
+int my_FLOW_PORT = FLOW_PORT;
+int my_PORT = PORT;
+
+void usage()
+{
+ fprintf(stderr,
+ "multicatcher (to receive files from multicaster) version %s\n"
+ " Option list:\n"
+ " [ -v <verbose_level 0-2> ]\n"
+ " -------- essential options ----------------------------------\n"
+ " -t <destination Dirpath>\n"
+ " -i <machineID 0 originated id numbers>\n"
+ " -------- options for backup ---------------------------------\n"
+ " [ -u <suffix> for backup files if -b is on in multicaster ]\n"
+ " -------- mcast options --------------------------------------\n"
+ " [ -A <my_mcast_address default=%s)> **same as for multicaster ]\n"
+ " [ -P <my_PORT default=%d> **same as for multicaster ]\n"
+ " [ -I <my_MCAST_IF default=NULL> ]\n",
+ VERSION, MCAST_ADDR, PORT);
+}
+
+int main(int argc, char *argv[])
+{
+ int old_mode; /* hp: from char to int for mode */
+ int mode;
+ int c;
+
+ while ((c = getopt(argc, argv, "v:A:P:t:i:u:I:")) != EOF) {
+ switch (c) {
+ case 'v':
+ verbose = atoi(optarg);
+ break;
+ case 'A':
+ my_MCAST_ADDR = optarg;
+ break;
+ case 'P':
+ my_PORT = atoi(optarg);
+ my_FLOW_PORT = my_PORT -1;
+ break;
+ case 'I':
+ my_IFname = optarg;
+ break;
+ case 't':
+ baseDir = optarg;
+ break;
+ case 'i':
+ machineID = atoi(optarg);
+ break;
+ case 'u':
+ backup_suffix = strdup(optarg);
+ break;
+ case '?':
+ usage();
+ exit(BAD_EXIT);
+ }
+ }
+
+ if (machineID < 0 || !baseDir) {
+ fprintf(stderr, "Essential options (-t -i) should be specified. \n");
+ usage();
+ exit(BAD_EXIT);
+ }
+
+ fprintf(stderr, "my_pid= %lu\n", getpid());
+
+ if (!backup_suffix) default_suffix();
+ get_tmp_suffix(); /* get a unique tmp_name for the tmp file */
+
+ init_page_reader();
+ init_complaint_sender();
+
+ /* initialize random numbers */
+ srand(time(NULL) + getpid());
+
+ /* set the timeout for readable() to be about 3 to 6 seconds
+ Actually, this setting is arbitrary.
+ The timeout of readable() does not play a role in
+ the logic flow.
+ */
+ set_delay( 3 + rand() % 6, 0);
+ mode = old_mode = TEST;
+
+ /* -----------------------The main loop---------------------------
+ Multicatcher simply waits for any incoming UDP,
+ reads and handles it.
+ If the UDP contains file content, it is placed in the right place.
+ If the UDP contains an instruction, it is carried out.
+
+ Multicatcher never complains unless being told so.
+ For example, as it is now, multicatcher does not complain
+ about the rate of incoming UDP being too fast to handle.
+ If multicatcher cannot keep up with the speed, it just
+ loses certain pages in a file which will be reported
+ later when multicaster requests acknowledgement.
+ ---------------------------------------------------------------- */
+ while(1) { /* loop for all incoming pages */
+ if (verbose>=2)
+ fprintf(stderr, "Starting listen loop with mode %d, old_mode = %d\n", mode, old_mode);
+
+ /* the major task here */
+ mode = read_handle_page();
+ if (verbose>=2) fprintf(stderr, "new page in mode %d\n", mode);
+
+ if (mode == ALL_DONE_CMD) break;
+
+ /* for debugging purpose */
+ if ((old_mode != mode)) {
+ if (verbose>=2 && mode <= 5) fprintf(stderr, "%s\n", cmd_name[mode]);
+ }
+
+ /* got no data? */
+ if (mode == TIMED_OUT) {
+ if (verbose>=2) fprintf(stderr, "*");
+ }
+
+ old_mode = mode;
+ } /* end of incoming page loop */
+
+ if (verbose>=1) fprintf(stderr, "Done!\n");
+ return 0;
+}
diff --git a/src/page_reader.c b/src/page_reader.c
new file mode 100644
index 0000000..8f349a0
--- /dev/null
+++ b/src/page_reader.c
@@ -0,0 +1,426 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+
+/* the following is needed on Sun but not on linux */
+#ifdef _SUN
+#include <sys/filio.h>
+#endif
+
+extern int machineID;
+extern int verbose;
+extern char * my_MCAST_ADDR; /* defined in multicatcher.c */
+extern char * my_IFname;
+extern int my_PORT;
+extern unsigned int current_file_id;
+
+int isMonitor; /* flag = if this target machine is a designated monitor */
+int nPage_recv; /* counter for the number of pages received for a file */
+int machineState; /* there are four states during one file transmission */
+int isFirstPage=TRUE; /* flag */
+
+/* The followings are used to determine sick condition */
+int current_missing_pages;
+int last_missing_pages;
+int sick_count;
+
+/* receive socket */
+int recfd;
+#ifndef IPV6
+struct sockaddr_in rec_addr;
+#else
+struct sockaddr_in6 rec_addr;
+#endif
+
+/*
+ Receive buffer for storing the data obtained from UDP
+ The format:
+ (5*sizeof(int) bytes header) + (PAGE_SIZE data area)
+
+ The header has five int_type (4 bytes) int's.
+ (1) mode -- for master to give instructions to the target machines.
+ (2) current file index (starting with 1)
+ (3) current page index (starting with 1)
+ (4) bytes that has been sent in this UDP page
+ (5) total number of pages.
+
+ data_ptr points to the data area.
+*/
+int *mode_ptr; /* hp: change from char to int */
+int *total_pages_ptr;
+int *current_page_ptr;
+int *bytes_sent_ptr, *current_file_ptr;
+char *data_ptr;
+char rec_buf[PAGE_BUFFSIZE];
+
+void init_page_reader()
+{
+ /*struct ip_mreq mreq;*/
+ int rcv_size;
+
+ machineState = IDLE_STATE;
+ isMonitor = FALSE;
+
+ /* Prepare buffer pointers */
+ mode_ptr = (int*)rec_buf;
+ current_file_ptr = (int *)(mode_ptr + 1);
+ current_page_ptr = (int *)(current_file_ptr + 1);
+ bytes_sent_ptr = (int *)(current_page_ptr + 1);
+ total_pages_ptr = (int *)(bytes_sent_ptr + 1);
+ data_ptr = (char *)(total_pages_ptr + 1);
+
+ /* Set up receive socket */
+ if (verbose>=2) fprintf(stderr, "setting up receive socket\n");
+ recfd = rec_socket(&rec_addr, my_PORT);
+
+ /* Join the multicast group */
+ /*inet_pton(AF_INET, MCAST_ADDR, &(mreq.imr_multiaddr.s_addr));
+ mreq.imr_multiaddr.s_addr = inet_addr(my_MCAST_ADDR);
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+
+ if (setsockopt(recfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void*)&mreq, sizeof(mreq)) < 0){
+ perror("Joining Multicast Group");
+ }
+ */
+
+ if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) {
+ perror("Joining Multicast Group");
+ }
+
+ /* Increase socket receive buffer */
+ rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE;
+ if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){
+ perror("Expanding receive buffer for page_reader");
+ }
+
+}
+
+/*
+ This is the heart of multicatcher.
+ It parses the incoming pages and do proper reactions according
+ to the mode (command code) encoded in the first 4 bytes in an UDP page.
+ It returns the mode.
+*/
+int read_handle_page()
+{
+ #ifndef IPV6
+ struct sockaddr_in return_addr;
+ #else
+ struct sockaddr_in6 return_addr;
+ #endif
+ int bytes_read;
+ socklen_t return_len = (socklen_t)sizeof(return_addr);
+ int mode_v, bytes_sent_v, total_pages_v, current_file_v, current_page_v;
+
+ /* -----------receiving data -----------------*/
+ if (readable(recfd) == 1) { /* there is data coming in */
+ /* check_queue(); This might be useful. But need more study. */
+ /* get data */
+ bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0,
+ (struct sockaddr *)&return_addr,
+ (socklen_t*) &return_len);
+
+ bytes_sent_v = ntohl(*bytes_sent_ptr);
+ if (bytes_read != (bytes_sent_v + HEAD_SIZE))
+ return NULL_CMD;
+
+ /* convert from network byte order to host byte order */
+ mode_v = ntohl(*mode_ptr);
+ total_pages_v = ntohl(*total_pages_ptr);
+ current_file_v = ntohl(*current_file_ptr);
+ current_page_v = ntohl(*current_page_ptr);
+
+ if (isFirstPage) {
+ update_complaint_address(&return_addr);
+ isFirstPage = FALSE;
+ }
+
+ /* --- process various commands (modes) */
+ switch (mode_v) {
+ case TEST:
+ /* It is just a test packet? */
+ fprintf(stderr, "********** Received test packet **********\n");
+ return mode_v;
+
+ case SELECT_MONITOR_CMD:
+ if (current_page_v == machineID) {
+ isMonitor = TRUE;
+ send_complaint(MONITOR_OK, machineID, 0, 0);
+ } else {
+ isMonitor = FALSE;
+ }
+ return mode_v;
+
+ case OPEN_FILE_CMD:
+ if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)
+ && machineState == IDLE_STATE) {
+ /* get info about this file */
+ if (verbose>=1)
+ fprintf(stderr, "open file id= %d\n", current_file_v);
+ if (!extract_file_info(data_ptr, current_file_v, total_pages_v))
+ return mode_v;
+
+ /* different tasks here */
+ /* open file, rmdir, unlink */
+ if (total_pages_v < 0) { /* delete a file or a directory */
+ if (!delete_file(TRUE)) return mode_v; /* this fx can be re-entered many times */
+ /* machineState remains to be IDLE_STATE */
+ } else if (total_pages_v == 0) {
+ /* handle an empty file, or (soft)link or directory */
+ if (!check_zero_page_entry()) return mode_v; /* can re-enter many times */
+ /* machineState remains to be IDLE_STATE */
+ } else { /* a regular file */
+ if (!open_file()) return mode_v;
+ /* the file has been opened. */
+ sick_count = 0;
+ current_missing_pages =0;
+ last_missing_pages = nPages_for_file(current_file_v);
+ machineState = GET_DATA_STATE;
+ }
+ send_complaint(OPEN_OK, machineID, 0, current_file_id); /* ack */
+ nPage_recv = 0;
+ return mode_v;
+ }
+ /*
+ We must be in GET_DATA_STATE.
+ OPEN_OK ack has been sent back in the previous block.
+ However,
+ the master may not have received the ack.
+ In that case the master will send back the open_file_cmd again.
+ */
+ if ((current_page_v == (int) ALL_MACHINES ||current_page_v == (int) machineID)
+ && (machineState == GET_DATA_STATE)) { /****/
+ send_complaint(OPEN_OK, machineID, 0, current_file_id);
+ }
+ return mode_v;
+
+ case EOF_CMD:
+ /**********/
+ if (verbose>=1)
+ fprintf(stderr, "***** EOF received for id=%d state=%d id=%d, file=%d\n",
+ current_page_v, machineState, machineID, current_file_v);
+
+ /* the following happnens when this machine was previously out-of-pace
+ and was labeled as 'BAD MACHINE' by the master.
+ The master has proceeded with the syncing process without
+ waiting for this machine to finish the process in one of the previous files.
+ Since under normal condition, this machine should not expect to see
+ current_file changes except when OPEN_FILE_CMD is received.
+ */
+ if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
+
+ /* normal condition */
+ if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)
+ && machineState == GET_DATA_STATE) { /* GET_DATA_STATE */
+ /* check missing pages and send back missing-page-request */
+ current_missing_pages = ask_for_missing_page(); /* = total # of missing_pages */
+
+ if (current_page_v == (int) machineID) {
+ /* master is asking for my EOF_ack */
+ if (current_missing_pages == 0) {
+ /* w/o assuming how we get to this point ... */
+ machineState = DATA_READY_STATE;
+ send_complaint(EOF_OK, machineID, 0, current_file_id);
+ } else {
+ send_complaint(MISSING_TOTAL, machineID,
+ current_missing_pages , current_file_id);
+ missing_page_stat(); /* this has to be done before close_file() ******************/
+ }
+ return mode_v;
+ }
+
+ /***
+ master is asking everyone, (after master sent or re-sent pages)
+ so, we do some book-keeping procedures (incl state change)
+ ***/
+ if (verbose >=1)
+ fprintf(stderr, "missing_pages = %d, nPages_received = %d file = %d\n",
+ current_missing_pages, nPage_recv, current_file_v); /************/
+
+ nPage_recv = 0;
+
+ if (current_missing_pages == 0) {
+ /*
+ There is no missing page.
+ Change the state.
+ */
+ machineState = DATA_READY_STATE;
+ send_complaint(EOF_OK, machineID, 0, current_file_id);
+ return mode_v;
+ } else {
+ /*
+ There are missing pages.
+ If we still miss many pages for SICK_THRESHOLD consecutive times,
+ then we are sick. e.g. machine CPU does not give multicatcher
+ enough time to process incoming UDP's OR the disk I/O is too slow.
+ */
+
+ if ((SICK_RATIO)*(double)last_missing_pages < (double)current_missing_pages) {
+ ++sick_count;
+ if (sick_count > SICK_THRESHOLD) {
+ machineState = SICK_STATE;
+ send_complaint(SIT_OUT, machineID,
+ 0, current_file_id); /* no more attempt to receive */
+ /* master may send more pages from requests from other machines
+ but this machine will mark this file as 'sits out receiving' */
+ } else {
+ /* not sick enough yet */
+ send_complaint(MISSING_TOTAL, machineID,
+ current_missing_pages, current_file_id);
+ missing_page_stat(); /* this has to be done before close_file() ******************/
+ /* master will send more pages */
+ }
+ } else { /* we are getting enough missing pages this time to keep up */
+ sick_count = 0; /* break the consecutiveness */
+ send_complaint(MISSING_TOTAL, machineID,
+ current_missing_pages, current_file_id);
+ missing_page_stat(); /* this has to be done before close_file() ******************/
+ /* master will send more pages */
+ }
+ last_missing_pages = current_missing_pages;
+ return mode_v;
+ }
+ } /* end GET_DATA_STATE */
+
+ /* After state change, we still get request for ack.
+ send back ack again */
+ if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)) {
+ switch (machineState) {
+ case DATA_READY_STATE:
+ send_complaint(EOF_OK, machineID,
+ 0, current_file_id);
+ return mode_v;
+ case SICK_STATE:
+ send_complaint(SIT_OUT, machineID,
+ 0, current_file_id); /* just an ack, even for sick state*/
+ return mode_v;
+ }
+ }
+ return mode_v;
+
+ case CLOSE_FILE_CMD:
+ if (verbose>=1)
+ fprintf(stderr, "***** CLOSE received for id=%d state=%d id=%d, file=%d\n",
+ current_page_v, machineState, machineID, current_file_v);
+
+ if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
+
+ if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) {
+ if (machineState == DATA_READY_STATE) {
+ if (!close_file()) { return mode_v; };
+ set_owner_perm_times();
+ machineState = IDLE_STATE;
+ send_complaint(CLOSE_OK, machineID, 0, current_file_id);
+ return mode_v;
+ } else if (machineState == IDLE_STATE) {
+ /* send ack back again because we are asked */
+ send_complaint(CLOSE_OK, machineID, 0, current_file_id);
+ return mode_v;
+ } else { /* other states -- we should not be here*/
+ /* if (machineState == SICK_STATE || machineState == GET_DATA_STATE) */
+ /* SICK_STATE --> we are too slow in getting missing pages
+ if one of the machines is sick, master will send out CLOSE_ABORT
+ GET_DATA_STATE -->
+ We are not supposed to be in GET_DATA_STATE,
+ so consider it a sick_state */
+ fprintf(stderr, "*** should not be here -- state=%d\n", machineState);
+ if (!rm_tmp_file()) { return mode_v; };
+ machineState = IDLE_STATE;
+ send_complaint(SIT_OUT, machineID, 0, current_file_id);
+ /* make sick_count larger than threshold for GET_DATA_STATE */
+ sick_count = SICK_THRESHOLD + 10000;
+ return mode_v;
+ }
+ }
+ return mode_v;
+
+ case CLOSE_ABORT_CMD:
+ if (verbose>=1)
+ fprintf(stderr, "***** CLOSE_ABORT received for id=%d state=%d id=%d, file=%d\n",
+ current_page_v, machineState, machineID, current_file_v);
+
+ if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
+
+ if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) {
+ if (!rm_tmp_file()) { return mode_v; };
+ machineState = IDLE_STATE;
+ send_complaint(CLOSE_OK, machineID, 0, current_file_id);
+ }
+ return mode_v;
+
+ case SENDING_DATA:
+ case RESENDING_DATA:
+ if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
+ /*
+ otherwise, go ahead...
+ */
+ if (verbose>=2) {
+ fprintf(stderr, "Got %d bytes from page %d of %d for file %d mode=%d\n",
+ bytes_read - HEAD_SIZE,
+ current_page_v, total_pages_v,
+ current_file_v + 1, mode_v);
+ }
+
+ /* timing the disk IO */
+ /* start_timer(); */
+
+ write_page(current_page_v, data_ptr, bytes_read - HEAD_SIZE);
+ if (isMonitor) send_complaint(PAGE_RECV, machineID, 0, current_file_id);
+
+ /*
+ end_timer();
+ update_time_accumulator();
+ */
+
+ /* Yes, we have just read a page */
+ ++nPage_recv;
+ return mode_v;
+
+ case ALL_DONE_CMD:
+ /*
+ clear up the files.
+ */
+ /*** since we do not know if there are other machines
+ that are NOT in data_ready_state,
+ to maintain equality, it is best to just
+ remove tmp_file without close_file() ***/
+ rm_tmp_file();
+ return mode_v;
+
+ default:
+ return mode_v;
+ } /* end of switch */
+ } else {
+ /* No, the read is timed out */
+ return TIMED_OUT;
+ }
+}
+
diff --git a/src/parse_synclist.c b/src/parse_synclist.c
new file mode 100644
index 0000000..0c2a7aa
--- /dev/null
+++ b/src/parse_synclist.c
@@ -0,0 +1,320 @@
+/*
+ Copyright (C) 2006-2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <stdlib.h>
+#include <limits.h> /* to define PATH_MAX */
+
+/*
+ Get the next to-be-synced file from synclist which is the output of
+ parseRsyncList.C
+ The latter in turn parses the output from rsync.
+ In other words, we use rsync in dry-run mode to get the
+ files that need to be synced.
+
+ to port to large_file environment: use off_t for size
+*/
+
+extern int verbose;
+
+char basedir[PATH_MAX]; /* baseDir for the syncing */
+FILE *fd;
+struct stat st; /* stat for the current file */
+
+unsigned int nEntries;
+int cur_entry; /* file id -- <0 if it needs backup */
+unsigned int nPages;
+unsigned int last_bytes; /* number of bytes for the last page */
+int toRmFirst = FALSE; /* flag rm existing file and then sync */
+int file_changed = FALSE; /* flag to indicate if file has been changed during syncing */
+
+unsigned int total_pages;
+off_t total_bytes;
+
+int isDelete, isHardlink;
+char filename[PATH_MAX];
+char fullname[PATH_MAX];
+char linktar[PATH_MAX];
+
+int init_synclist(char * synclist_path, char *bdir)
+{
+ char line[PATH_MAX];
+ nEntries = 0;
+ strcpy(basedir, bdir);
+
+ if ((fd = fopen(synclist_path, "r")) == NULL) {
+ fprintf(stderr, "Cannot open synclist file = %s \n", synclist_path);
+ return FAIL;
+ }
+
+ while (fgets(line, PATH_MAX, fd) != NULL) nEntries++;
+ if (nEntries == 0) {
+ fclose(fd);
+ fprintf(stderr, "Empty entires in synclist file = %s\n", synclist_path);
+ return SUCCESS; /* OK, nothing to sync */
+ }
+ rewind(fd);
+
+ cur_entry = 0;
+ total_pages = 0;
+ total_bytes = 0;
+
+ return SUCCESS;
+}
+
+/*
+ pages_for_file calculates the number (int) of pages for the current file
+ and returns that number in an (int) type.
+ So, max_pages = 2**31 = 2147483648
+ which corresponds to a file_size of 2**31 * 64512 = 1.38e14
+ [ general limit = (1<<(sizeof(int)*8)) * PAGE_SIZE ]
+ At that time, the type of page_number needs to be upgraded :)
+
+ to_delete -> -1
+ normal_file -> number_of_pages
+ softlink -> 0
+ hardlink -> 0
+ directory -> 0
+*/
+int pages_for_file()
+{
+ if (isDelete) { /* to be deleted directory or file */
+ return TO_DELETE;
+ }
+
+ if (S_ISREG(st.st_mode)){
+ int n;
+ if (st.st_nlink > 1) return 0; /* hardlink file */
+
+ n = (int)((st.st_size)/(off_t)PAGE_SIZE); /* regular file */
+ if ((st.st_size)%((off_t)PAGE_SIZE) == 0) {
+ last_bytes = (unsigned int)(PAGE_SIZE);
+ return n;
+ } else {
+ last_bytes = (unsigned int)(st.st_size - (off_t)n * (off_t)PAGE_SIZE);
+ return n+1;
+ }
+ /*return ((st.st_size)%((off_t)PAGE_SIZE) == 0) ? n : n+1 ;*/
+ }
+ if (S_ISLNK(st.st_mode)){
+ return 0; /* softlink */
+ }
+ return 0; /* directory */
+}
+
+off_t bytes_for_file()
+{
+ return st.st_size;
+}
+
+unsigned int get_nPages() /* for this file */
+{
+ return nPages;
+}
+
+void strip(char * str)
+{
+ /* remove trailing \n and spaces */
+ char *pc = &str[strlen(str)-1];
+ while (*pc == ' ' || *pc == '\n') {
+ *(pc--) = '\0';
+ }
+}
+
+int same_stat_for_file()
+{
+ /* check if current stat is same as that when get_next_entry is called */
+ struct stat st1;
+
+ if(lstat(fullname, &st1) < 0) {
+ if (verbose >=1) perror(fullname);
+ return FAIL;
+ }
+
+ if (st1.st_size != st.st_size || st1.st_mode != st.st_mode ||
+ st1.st_mtime != st.st_mtime) {
+ return FAIL; /* the file has changed */
+ }
+ return SUCCESS;
+
+}
+
+int is_hardlink_line(char * line)
+{
+ /* when line is in the form of
+ string1 string2
+ it can be either a filename (string1 string2)
+ or a hardlink string1 => string2
+ */
+ struct stat st;
+ char fn[PATH_MAX];
+ strcpy(fn, basedir);
+ strcat(fn, "/");
+ strcat(fn, line);
+
+ /* if the whole line is not a file, then we are dealing with hardlink case */
+ return (lstat(fn, &st) < 0);
+ /* cannot deal with the situation
+ str1 is a file
+ str2 is a hardlink to str1
+ str1 str2 is a file
+ AND if we need to sync
+ str1 and 'str1 str2' at the same time.
+ But this situation is very rare. */
+}
+
+int get_next_entry(int current_file_id)
+{
+ char *c;
+ char line[PATH_MAX];
+
+ /* inside this function, cur_entry is set to be positve to facilitate processing */
+ cur_entry = current_file_id; /* from main loop's index, starting with 1 */
+
+ isDelete = FALSE;
+ isHardlink = FALSE;
+
+ fgets(line, PATH_MAX, fd);
+ strip(line);
+
+ if (current_file_id == nEntries) {
+ fclose(fd); /* close the synclist file */
+ if (verbose>=2) fprintf(stderr, "no more entry in synclist.\n");
+ }
+
+ if (verbose>=2) {
+ fprintf(stderr, "Got current entry = %d (total= %d)\n", cur_entry, nEntries);
+ fprintf(stderr, "%s\n", line);
+ }
+
+ strcpy(fullname, basedir);
+ strcat(fullname, "/");
+ if (strncmp(line, "deleting ", 9)==0) {
+ isDelete = TRUE;
+ nPages = -1;
+ strcat(fullname, &line[9]);
+ strcpy(filename, &line[9]);
+ if (needBackup(fullname)) cur_entry = -cur_entry;
+ return SUCCESS;
+ } else if ((c = strchr(line, ' '))!=NULL && is_hardlink_line(line)) {
+ /* is it a hardlink -- two filenames separated by a space */
+ char fn[PATH_MAX];
+ isHardlink = TRUE;
+ strncpy(fn, line, (c - line));
+ fn[c-line] = '\0';
+ strcat(fullname, fn);
+ strcpy(filename, fn);
+ strcpy(linktar, c+1);
+ } else {
+ /* normal, single entry */
+ strcat(fullname, line);
+ strcpy(filename, line);
+ }
+
+ /* update stat */
+ if(lstat(fullname, &st) < 0) {
+ if (verbose >=1) perror(fullname);
+ return FAIL;
+ }
+
+ if (S_ISLNK(st.st_mode)) {
+ int linklen;
+ linklen = readlink(fullname, linktar, PATH_MAX);
+ /* readlink doesn't null-terminate the string */
+ *(linktar + linklen) = '\0';
+ } else if (st.st_nlink>1 && !isHardlink) {
+ /* this is the target file that others (hard)link to.
+ treat it like a normal file */
+ st.st_nlink = 1;
+ }
+
+ nPages = pages_for_file();
+ if (nPages > 0) { /* for regular files */
+ total_pages += nPages;
+ total_bytes += st.st_size;
+ }
+
+ if (needBackup(fullname)) cur_entry = -cur_entry;
+
+ file_changed = FALSE;
+
+ return SUCCESS;
+}
+
+void adjust_totals()
+{
+ if (nPages > 0) {
+ total_pages -= nPages;
+ total_bytes -= st.st_size;
+ }
+}
+
+/* some accessors */
+unsigned int total_entries() { return nEntries; }
+
+int current_entry() { return cur_entry; }
+
+int is_softlink() { return S_ISLNK(st.st_mode); }
+
+int is_hardlink() { return isHardlink; }
+
+int is_directory() { return S_ISDIR(st.st_mode); }
+
+char * getFilename() { /* relative to basedir */ return &filename[0]; }
+
+char * getFullname() { return &fullname[0]; }
+
+/*
+ The following three fx are used to fill file_info into the send_buffer.
+ They return the number of bytes being written into the buf, including the \0 byte.
+*/
+unsigned int fill_in_stat(char *buf)
+{
+ /* load into buf area the stat info in ascii format */
+ if (isDelete)
+ sprintf(buf, "0 0 0 0 0 0 0 0");
+ else {
+ #ifdef _LARGEFILE_SOURCE
+ sprintf(buf, "%u %u %u %u %llu %lu %lu %d", st.st_mode, st.st_nlink,
+ st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime,
+ toRmFirst);
+ #else
+ sprintf(buf, "%u %u %u %u %lu %lu %lu %d", st.st_mode, st.st_nlink,
+ st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime,
+ toRmFirst);
+ #endif
+ }
+
+ return strlen(buf)+1;
+}
+
+unsigned int fill_in_filename(char * buf)
+{
+ strcpy(buf, filename);
+ return strlen(buf)+1;
+}
+
+unsigned int fill_in_linktar(char *buf)
+{
+ strcpy(buf, linktar);
+ return strlen(buf)+1;
+}
+
+
diff --git a/src/proto.h b/src/proto.h
new file mode 100644
index 0000000..6569d44
--- /dev/null
+++ b/src/proto.h
@@ -0,0 +1,182 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __main_proto_h
+#define __main_proto_h
+
+/* parse_synclist.c */
+unsigned int total_entries();
+unsigned int fill_in_stat(char *buf);
+unsigned int fill_in_linktar(char *buf);
+unsigned int fill_in_filename(char * buf);
+unsigned int get_nPages();
+int pages_for_file();
+char * getFilename();
+char * getFullname();
+int same_stat_for_file();
+void strip(char * str);
+int current_entry();
+int get_next_entry(int current_file_id);
+int is_softlink();
+int is_directory();
+int is_hardlink();
+int init_synclist(char * synclist_path, char *bdir);
+void adjust_totals();
+
+/* backup.c */
+int read_backup_pattern(char * fpat_file);
+int needBackup(char * filename);
+
+/* sends.c */
+void init_sends();
+void set_mode(int new_mode);
+int send_page(int page);
+void send_test();
+void send_cmd(int code, int machine_id);
+void send_all_done_cmd();
+int fexist(int entry) ;
+void pack_open_file_info();
+void my_exit(int);
+
+/* complaints.c */
+void init_complaints();
+int read_handle_complaint(int cmd);
+void wait_for_ok(int code);
+void refresh_machine_status();
+void refresh_missing_pages();
+void mod_machine_status();
+void refresh_file_received();
+int nNotRecv();
+int iNotRecv();
+int is_it_missing(int page);
+int has_missing_pages();
+int has_sick_machines();
+void init_missing_page_flag(int n);
+void free_missing_page_flag();
+void refresh_machine_status();
+void init_machine_status(int n);
+void page_sent(int page);
+int nBadMachines();
+void do_badMachines_exit();
+int pr_missing_pages();
+int send_done_and_pr_msgs(double, double);
+void do_cntl_c(int signo);
+void set_has_missing();
+void reset_has_missing();
+void set_has_sick();
+void reset_has_sick();
+
+
+/* setup_socket.c */
+void set_delay(int secs, int usecs);
+void get_delay(int * secs, int * usecs);
+int readable(int fd);
+#ifndef IPV6
+int complaint_socket(struct sockaddr_in *addr, int port);
+int send_socket(struct sockaddr_in *addr, char * cp, int port);
+int rec_socket(struct sockaddr_in *addr, int port);
+#else
+int rec_socket(struct sockaddr_in6 *addr, int port);
+int send_socket(struct sockaddr_in6 *addr, char * cp, int port);
+int complaint_socket(struct sockaddr_in6 *addr, int port);
+#endif
+
+/* set_mcast.c */
+int mcast_set_if(int sockfd, const char *ifname, u_int ifindex);
+int mcast_set_loop(int sockfd, int onoff);
+int mcast_set_ttl(int sockfd, int val);
+
+/* set_catcher_mcast.c */
+int Mcast_join(int sockfd, const char *mcast_addr,
+ const char *ifname, u_int ifindex);
+void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr);
+
+/* complaint_sender.c */
+void fill_in_int(int i);
+void init_fill_ptr();
+void send_complaint(int complaint, int mid, int page, int file);
+void init_complaint_sender();
+#ifndef IPV6
+void update_complaint_address(struct sockaddr_in *sa);
+#else
+void update_complaint_address(struct sockaddr_in6 *sa);
+#endif
+
+/* page_reader.c */
+void init_page_reader();
+int check_queue();
+int read_handle_page();
+
+/* file_operations.c */
+void get_tmp_suffix();
+int extract_file_info(char * buf, int n_file, unsigned int n_pages);
+int open_file();
+int close_file();
+int rm_tmp_file();
+int delete_file(int to_check_dir_type);
+int touch_file();
+int nPages_for_file();
+int has_all_pages();
+int ask_for_missing_page();
+void missing_page_stat();
+void write_page(int page, char* data_ptr, int bytes);
+int is_missing(int page);
+void page_received(int page);
+int set_owner_perm_times();
+void close_last_file();
+int check_zero_page_entry();
+void default_suffix();
+
+/* timing */
+void refresh_timer();
+double get_accumulated_time();
+void start_timer();
+void end_timer();
+void update_time_accumulator();
+double get_accumulated_usec();
+void update_rtt_hist(unsigned int rtt);
+void pr_rtt_hist();
+void init_rtt_hist();
+unsigned int pages_wo_ack();
+
+/* signal.c */
+typedef void Sigfunc(int); /* for signal handlers */
+Sigfunc * Signal(int signo, Sigfunc *func);
+int Fcntl(int fd, int cmd, int arg);
+int Ioctl(int fd, int request, void *arg);
+void Sigemptyset(sigset_t *set);
+void Sigaddset(sigset_t *set, int signo);
+void Sigprocmask(int how, const sigset_t *set, sigset_t *oset);
+
+/* id_map.c */
+void get_machine_names(char * filename);
+char * id2name(int id);
+
+#endif
diff --git a/src/rtt.c b/src/rtt.c
new file mode 100644
index 0000000..71c8bd0
--- /dev/null
+++ b/src/rtt.c
@@ -0,0 +1,258 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+/*
+ To measure round trip time (RTT) using UDP
+*/
+
+#include "rttmain.h"
+#include <limits.h> /* to define PATH_MAX */
+#include <libgen.h>
+
+char * my_MCAST_ADDR = MCAST_ADDR;
+int my_FLOW_PORT = FLOW_PORT;
+int my_PORT = PORT;
+int my_TTL = MCAST_TTL;
+int my_LOOP = MCAST_LOOP;
+char * my_IFname = MCAST_IF;
+
+int no_feedback_count;
+char * machine = NULL;
+int remote_pid;
+char * reshell = REMOTE_SHELL;
+char catcher_path[PATH_MAX];
+
+void usage()
+{
+ fprintf(stderr,
+ "rtt (to measure rount trip time to a target version %s\n"
+ " Option list:\n"
+ " [ -v flag to turn on verbose]\n"
+ " [ -r <remote shell, default=%s> ]\n"
+ " [ -p <PATH for remote rttcatcher, default=%s> ]\n"
+ " -------- essential options ----------------------------------\n"
+ " -m <destination machine_name>\n"
+ " -n <num Of Pages>\n"
+ " -s <page_size in bytes; max=64512>\n"
+ " -------- mcast options --------------------------------------\n"
+ " [ -A <my_mcast_address default=%s)> ]\n"
+ " [ -P <my_PORT default=%d> ]\n"
+ " [ -T <my_TTL default=%d> ]\n"
+ " [ -L flag turn on mcast_LOOP. default is off ]\n"
+ " [ -I <my_MCAST_IF default=NULL> ]\n",
+ VERSION, reshell, catcher_path, my_MCAST_ADDR, my_PORT, my_TTL);
+}
+
+void get_dirname_of_rtt(char *dname, char *rtt_path)
+{
+ char path[PATH_MAX];
+ strcpy(path, rtt_path); /* dirname() will change its argument */
+ strcpy(dname, dirname(path));
+
+ if (strcmp(dname, ".")==0) {
+ strcpy(dname, getcwd(path, PATH_MAX));
+ }
+}
+
+void do_one_page(int page)
+{
+ unsigned long rtt;
+ refresh_timer();
+ start_timer();
+ send_page(page);
+ /* read_handle_complaint() waits n*interpage_interval at most */
+ if (read_handle_complaint()==0) { /* delay_sec for readable() is set by set_delay() */
+ /*
+ At this point, the readable() returns without getting a reply
+ from the target after n*DT_PERPAGE
+ This indicates that the page has likely been lost in the network.
+ */
+ if (verbose) fprintf(stderr, "no ack for page = %d\n", page);
+ ++no_feedback_count;
+ update_rtt_hist(999999); /* register this as rtt = infinite --- the last element in rtt_hist */
+ if (no_feedback_count>NO_FEEDBACK_COUNT_MAX) { /* count the consecutive no_feedback event */
+ /* switch to another client */
+ fprintf(stderr, "Consecutive non_feedback exceeds limit. Continue with next page...\n");
+ no_feedback_count = 0;
+ }
+ } else {
+ end_timer();
+ update_time_accumulator();
+ rtt = get_accumulated_usec();
+ /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */
+ /* to do: update histogram */
+ if (verbose>=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt);
+ update_rtt_hist(rtt);
+
+ no_feedback_count = 0;
+ }
+}
+
+int invoke_catcher(char * machine)
+{
+ FILE *ptr;
+ char buf[PATH_MAX];
+
+ /* invoke rttcatcher on remote machine */
+ fprintf(stderr, "using %s to invoke rttcatcher on %s\n", reshell, machine);
+
+ /* check if rsh (ssh) works */
+ sprintf(buf, "%s %s date", reshell, machine);
+ if (verbose) fprintf(stderr, "%s\n", buf);
+ if (system(buf)) {
+ fprintf(stderr, "cannot do rsh to the target machine = %s\n", machine);
+ exit(BAD_EXIT);
+ }
+
+ if (!my_IFname) {
+ sprintf(buf,
+ "%s %s '%s/rttcatcher -A %s -P %d < /dev/null 1>/dev/null 2>/dev/null & echo $!'",
+ reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT);
+ } else {
+ sprintf(buf,
+ "%s %s '%s/rttcatcher -A %s -P %d -I %s < /dev/null 1>/dev/null 2>/dev/null & echo $!'",
+ reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT, my_IFname);
+ }
+
+
+ fprintf(stderr, "%s\n", buf);
+ if ((ptr = popen(buf, "r")) == NULL) {
+ fprintf(stderr, "Failure to invoke rttcather\n");
+ exit(-1);
+ }
+ fgets(buf, PATH_MAX, ptr);
+ pclose(ptr);
+
+ return atoi(buf);
+}
+
+int main(int argc, char *argv[])
+{
+ int c;
+ int nPages =-1, pageSize=-1, ipage;
+
+ verbose = 0;
+ catcher_path[0] = '\0';
+
+ while ((c = getopt(argc, argv, "vm:s:n:r:p:A:P:T:LI:")) != EOF) {
+ switch (c) {
+ case 'v':
+ verbose = 1;
+ break;
+ case 'r':
+ reshell = optarg;
+ break;
+ case 'p':
+ strcpy(catcher_path, optarg);
+ break;
+ case 'A':
+ my_MCAST_ADDR = optarg;
+ break;
+ case 'P':
+ my_PORT = atoi(optarg);
+ my_FLOW_PORT = my_PORT -1;
+ break;
+ case 'T':
+ my_TTL = atoi(optarg);
+ break;
+ case 'L':
+ my_LOOP = TRUE;
+ break;
+ case 'I':
+ my_IFname = optarg;
+ break;
+ case 'n' :
+ nPages = atoi(optarg);
+ break;
+ case 'm':
+ machine = optarg;
+ break;
+ case 's':
+ pageSize = atoi(optarg);
+ if (pageSize>MAX_PAGE_SIZE) {
+ usage();
+ exit(-1);
+ }
+ break;
+ case '?':
+ usage();
+ exit(-1);
+ }
+ }
+
+ if (strlen(catcher_path)==0) {
+ get_dirname_of_rtt(catcher_path, argv[0]);
+ }
+
+
+ if (nPages == -1 || pageSize == -1 || machine == NULL) {
+ fprintf(stderr, "Essential options (-n -m -s) should be specified. \n");
+ usage();
+ exit(-1);
+ }
+
+ /* init */
+ init_sends(pageSize);
+ init_complaints();
+
+ /* set up Cntl_C catcher */
+ Signal(SIGINT, do_cntl_c);
+
+ remote_pid = invoke_catcher(machine);
+ fprintf(stderr, "remote pid = %d\n", remote_pid);
+
+ sleep(1);
+
+ /* -------------------Send data-------------------------------------- */
+
+ init_missing_page_flag(nPages);
+
+ send_cmd(START_CMD, nPages);
+ refresh_machine_status();
+ wait_for_ok(START_CMD);
+ do_badMachines_exit(machine, remote_pid);
+
+ fprintf(stderr, "Sending data...\n");
+ /* send pages */
+ set_mode(SENDING_DATA);
+ set_delay(0, DT_PERPAGE*FACTOR);
+
+ no_feedback_count = 0;
+ for (ipage = 0; ipage < nPages; ipage++){
+ do_one_page(ipage);
+ }
+
+ send_cmd(EOF_CMD, 0);
+ refresh_machine_status();
+ wait_for_ok(EOF_CMD);
+ do_badMachines_exit("", -1);
+
+ /* -----------------end of send data -------------------------------- */
+
+ free_missing_page_flag();
+ send_done_and_pr_msgs();
+ return 0;
+}
+
diff --git a/src/rttcatcher.c b/src/rttcatcher.c
new file mode 100644
index 0000000..1af74f1
--- /dev/null
+++ b/src/rttcatcher.c
@@ -0,0 +1,118 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Codes in this file are extracted and modified from multicatcher.c.
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+
+char * my_MCAST_ADDR = MCAST_ADDR;
+int my_FLOW_PORT = FLOW_PORT;
+int my_PORT = PORT;
+char * my_IFname = MCAST_IF;
+
+void usage()
+{
+ fprintf(stderr,
+ "rttcatcher (to receive pages on the target. version %s\n"
+ " Option list:\n"
+ " [ -v flag to turn on verbose]\n"
+ " -------- mcast options --------------------------------------\n"
+ " [ -A <my_mcast_address default=%s)> ]\n"
+ " [ -P <my_PORT default=%d> ]\n"
+ " [ -I <my_MCAST_IF default=NULL> ]\n",
+ VERSION, MCAST_ADDR, PORT);
+}
+
+int main(int argc, char *argv[])
+{
+ int old_mode; /* hp: from char to int for mode */
+ int mode;
+ int c;
+
+ verbose = 0;
+ while ((c = getopt(argc, argv, "vA:P:I:")) != EOF) {
+ switch (c) {
+ case 'v':
+ verbose = 1;
+ break;
+ case 'A':
+ my_MCAST_ADDR = optarg;
+ break;
+ case 'P':
+ my_PORT = atoi(optarg);
+ my_FLOW_PORT = my_PORT -1;
+ break;
+ case 'I':
+ my_IFname = optarg;
+ break;
+ case '?':
+ usage();
+ exit(-1);
+ }
+ }
+
+ init_page_reader();
+ init_complaint_sender();
+
+ /* initialize random numbers */
+ srand(time(NULL) + getpid());
+
+ /* Wait forever if necessary for first packet */
+ set_delay(0, -1);
+ mode = old_mode = TEST; /* hp: add mode */
+
+ while(1) { /* loop for all incoming pages */
+ if (verbose)
+ fprintf(stderr, "Starting listen loop with mode %d\n", mode);
+
+ mode = read_handle_page();
+ if (verbose) fprintf(stderr, "in mode %d\n", mode);
+
+ if (mode == ALL_DONE_CMD) break;
+
+ /* got no data? */
+ if (mode == TIMED_OUT) {
+ if (verbose) fprintf(stderr, "*");
+ continue;
+ } /* end if TIMED_OUT */
+
+ /* changing modes? */
+ if ((old_mode != SENDING_DATA) && (mode == SENDING_DATA)){
+ /* Taking data, wait at least 3 to 8 seconds */
+ set_delay( 3 + rand() % 8, 200);
+ if (verbose) fprintf(stderr, "Receiving data\n");
+ old_mode = mode;
+ continue;
+ }
+
+ /* all other modes */
+ old_mode = mode;
+
+ } /* end of incoming page loop */
+
+ if (verbose) fprintf(stderr, "Done!\n");
+ return 0;
+}
+
+
diff --git a/src/rttcomplaint_sender.c b/src/rttcomplaint_sender.c
new file mode 100644
index 0000000..20d7cda
--- /dev/null
+++ b/src/rttcomplaint_sender.c
@@ -0,0 +1,103 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Codes in this file are extracted and modified from complaint_sender.c
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+
+/* send socket */
+int complaint_fd;
+#ifndef IPV6
+struct sockaddr_in complaint_addr;
+#else
+struct sockaddr_in6 complaint_addr;
+#endif
+
+extern int my_FLOW_PORT;
+
+/* send buffer */
+char complaint_buffer[FLOW_BUFFSIZE];
+int *ccode_ptr; /* change from char to int -- mem alignment */
+int *cpage_ptr;
+
+/*----------------------------------------------------------
+ init_complaint_sender initializes the buffer to allow the
+ catcher to send complaints back to the sender.
+
+ ret_address of sender to whom we will complain
+ is determined when we receive the first UDP data
+ in read_handle_page() in page_reader.c
+ ----------------------------------------------------------*/
+void init_complaint_sender()
+{
+ if (verbose)
+ fprintf(stderr, "in init_complaint_sender\n");
+
+ /* init the send_socket */
+ complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT);
+
+ ccode_ptr = (int *) complaint_buffer;
+ cpage_ptr = (int *)(ccode_ptr + 1);
+}
+
+#ifndef IPV6
+void update_complaint_address(struct sockaddr_in *sa)
+{
+ sock_set_addr((struct sockaddr *) &complaint_addr,
+ sizeof(complaint_addr), (void*)&sa->sin_addr);
+}
+#else
+void update_complaint_address(struct sockaddr_in6 *sa)
+{
+ sock_set_addr((struct sockaddr *) &complaint_addr,
+ sizeof(complaint_addr), (void*)&sa->sin6_addr);
+}
+#endif
+
+/*------------------------------------------------------------------------
+ send_complaint fills the complaint buffer and send it through our socket
+ back to the sender
+
+ The major use is to tell master machine which page of which file
+ needs to be re-transmitted.
+ complaint -- the complain code defined in main.h
+ file -- the file index
+ page -- page index
+ ------------------------------------------------------------------------*/
+void send_complaint(int complaint, int page)
+{
+ /* fill in the complaint data */
+ /* 20060323 add converting to network byte-order before sending out */
+ *ccode_ptr = htonl(complaint);
+ *cpage_ptr = htonl(page);
+
+ /* send it */
+ if( sendto(complaint_fd, complaint_buffer, FLOW_BUFFSIZE, 0,
+ (const struct sockaddr *)&complaint_addr,
+ sizeof(complaint_addr)) < 0) {
+ perror("Sending complaint\n");
+ }
+ if (verbose)
+ printf("Sent complaint:code=%d page=%d\n", complaint, page);
+}
diff --git a/src/rttcomplaints.c b/src/rttcomplaints.c
new file mode 100644
index 0000000..4f9ed9b
--- /dev/null
+++ b/src/rttcomplaints.c
@@ -0,0 +1,270 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ codes in this file are extracted and modified from complaints.c
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+#include <sys/times.h>
+
+/* buffer for receiving complaints */
+
+char flow_buff[FLOW_BUFFSIZE];
+int *code_ptr; /* What's wrong? */
+int *page_ptr; /* Which page */
+
+/* receive socket */
+int complaint_fd;
+#ifndef IPV6
+struct sockaddr_in complaint_addr;
+#else
+struct sockaddr_in6 complaint_addr;
+#endif
+
+extern int my_FLOW_PORT;
+
+/* status */
+char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */
+int total_missing_page = 0;
+char machine_status = NOT_READY;
+int nMachines = 1;
+int nPages;
+char *machine_list_file;
+
+extern char * machine;
+extern int remote_pid;
+extern char * reshell;
+
+/*
+ init_complaints initializes our buffers to receive complaint information
+ from the catchers
+*/
+void init_complaints ()
+{
+ if (verbose)
+ fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE);
+
+ /* Buffer */
+ code_ptr = (int *)flow_buff;
+ page_ptr = (int *)(code_ptr + 1);
+
+ /* Receive socket */
+ if (verbose) printf("set up receive socket for complaints\n");
+ complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT);
+}
+
+void init_missing_page_flag(int n)
+{
+ int i;
+ nPages = n;
+ if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) {
+ fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n);
+ perror("error = ");
+ exit(-1);
+ }
+ for(i=0; i<nPages; ++i) {
+ missing_page_flag[i] = RECEIVED;
+ }
+}
+
+void page_sent(int page)
+{
+ missing_page_flag[page] = RECEIVED;
+}
+
+void free_missing_page_flag()
+{
+ free(missing_page_flag);
+ missing_page_flag = NULL;
+}
+
+
+void refresh_machine_status()
+{
+ machine_status = NOT_READY;
+}
+
+int get_total_missing_pages()
+{
+ return total_missing_page;
+}
+
+int read_handle_complaint()
+{
+ /*
+ return 1 for receiving complaint
+ return 0 for no complaint handled
+ */
+ int code_v, page_v, bytes_read;
+
+ if (readable(complaint_fd)) {
+
+ /* There is a complaint */
+ bytes_read = recvfrom(complaint_fd, flow_buff, FLOW_BUFFSIZE, 0, NULL, NULL);
+
+ /* 20060323 deal with big- vs little-endian issue
+ convert incoming integers into host representation */
+
+ if (bytes_read != FLOW_BUFFSIZE) return 0;
+
+ code_v = ntohl(*code_ptr);
+ page_v = ntohl(*page_ptr);
+
+ switch (code_v) {
+ case PAGE_RECV:
+ return 1;
+ case START_OK :
+ case EOF_OK :
+ machine_status = MACHINE_OK;
+ return 1;
+ case MISSING_PAGE :
+ if (page_v > nPages) return 1; /* *page_ptr = page # (1 origin) */
+ ++(total_missing_page);
+ missing_page_flag[(page_v)-1] = MISSING;
+ return 1;
+ case LAST_MISSING :
+ if (page_v > nPages) return 1;
+ ++(total_missing_page);
+ missing_page_flag[page_v-1] = MISSING;
+ machine_status = MACHINE_OK;
+ return 1;
+ default :
+ printf("Unknown complaint: %d\n", code_v);
+ return 0;
+ } /* end of switch */
+ } /* end of if(readable) */
+
+ return 0;
+}
+
+int all_machine_ok()
+{
+ return (machine_status == NOT_READY ) ? 0 : 1;
+}
+
+void wait_for_ok(int code)
+{
+ int i, count;
+ time_t tloc;
+ time_t rtime0, rtime1;
+
+ rtime0 = time(&tloc); /* reference time */
+
+ count = 0;
+ while (!all_machine_ok()) {
+ if (read_handle_complaint()==1) { /* if there is a complaint handled */
+ rtime0 = time(&tloc); /* reset the reference time */
+ continue;
+ }
+ /* no complaints handled */
+ rtime1 = time(&tloc); /* time since last complaints */
+ if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) {
+ ++count;
+ if (count < ACK_WAIT_TIMES) {
+ fprintf(stderr, "%d: resend cmd(%d) to machines:[ ", count, code);
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status == NOT_READY) {
+ fprintf(stderr, "%d ", i);
+ send_cmd(code, (int) i);
+ usleep(FAST);
+ }
+ }
+ fprintf(stderr, "]\n");
+ rtime0 = rtime1;
+ } else {
+ fprintf(stderr, "Time out for the 'bad' machines:[ ");
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status== NOT_READY) {
+ fprintf(stderr, "%d ", i);
+ }
+ }
+ fprintf(stderr, "]\n");
+ break;
+ }
+ }
+ }
+}
+
+int is_it_missing(int page)
+{
+ return (missing_page_flag[page]==MISSING) ? 1 : 0;
+}
+
+int has_missing_pages()
+{
+ int i;
+ for(i=0; i<nPages; ++i) {
+ if (missing_page_flag[i] == MISSING) {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+void pr_missing_pages()
+{
+ int N;
+
+ N = get_total_missing_pages();
+ fprintf(stderr, "Missing pages = %10d (%5.2f%%) out of total pages = %d\n",
+ N, (double)N/((double)nPages)*100.0, nPages);
+}
+
+void send_done_and_pr_msgs()
+{
+ send_all_done_cmd();
+ pr_missing_pages();
+ pr_rtt_hist();
+}
+
+void kill_pid()
+{
+ char cmd[100];
+ /* kill remote process in case where remote machine is not in the multicast network */
+ sprintf(cmd, "%s %s 'kill -9 %d'", reshell, machine, remote_pid);
+ fprintf(stderr, "To make sure we clean up the process on remote machine,\n");
+ fprintf(stderr, "%s\n", cmd);
+ system(cmd);
+}
+
+void do_cntl_c(int signo)
+{
+ fprintf(stderr, "Control_C interrupt detected!\n");
+ send_done_and_pr_msgs();
+ kill_pid();
+ exit(-1);
+}
+
+/* to do some cleanup before exit IF all machines are bad */
+void do_badMachines_exit(char* machine, int pid)
+{
+ if (machine_status != NOT_READY) return;
+ fprintf(stderr, "Remote machine is bad. Exit!\n");
+ send_done_and_pr_msgs();
+
+ if (pid > 0) {
+ kill_pid();
+ }
+
+ exit(-1);
+}
diff --git a/src/rttmain.h b/src/rttmain.h
new file mode 100644
index 0000000..78eb693
--- /dev/null
+++ b/src/rttmain.h
@@ -0,0 +1,126 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __main_h
+#define __main_h
+
+#include <time.h>
+#include <utime.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
+#include <arpa/inet.h> /* inet(3) functions */
+#include <errno.h>
+#include <fcntl.h> /* for nonblocking */
+#include <sys/ioctl.h>
+#include <netdb.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/stat.h> /* for S_xxx file mode constants */
+#include <sys/uio.h> /* for iovec{} and readv/writev */
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/time.h> /* timeval{} for select() */
+
+#define VERSION "3.1.0"
+
+/* logic values */
+#define FALSE 0
+#define TRUE 1
+#define FAIL (FALSE)
+#define SUCCESS (TRUE)
+#define GOOD_EXIT 0
+#define BAD_EXIT -1
+
+/* Ports and addresses */
+#define PORT 7900 /* for multicast */
+#define FLOW_PORT (PORT-1) /* for flow-control */
+#define MCAST_ADDR "239.255.67.200"
+#define MCAST_TTL 1
+#define MCAST_LOOP FALSE
+#define MCAST_IF NULL
+
+#define REMOTE_SHELL "rsh"
+
+#define NO_FEEDBACK_COUNT_MAX 5
+#define USEC_TO_IDLE 1000000
+
+/* Speed stuff */
+#define FAST 100 /* usec */
+#define DT_PERPAGE 8000 /* usec time interval between pages */
+#define FACTOR 50
+
+/* time for the master to wait for the acknowledgement */
+#define ACK_WAIT_PERIOD 1 /* secs (from time()); */
+#define ACK_WAIT_TIMES 60 /* wait for this many periods */
+
+/* complaints */
+#define TOO_FAST 100
+#define SEND_AGAIN 200
+#define START_OK 300
+#define MISSING_PAGE 500
+#define LAST_MISSING 600
+#define EOF_OK 700
+#define PAGE_RECV 800
+
+#define FLOW_BUFFSIZE (2 * sizeof(int))
+
+#define PAGE_SIZE 64512 /* max page_size allowed */
+#define HEAD_SIZE (3 * sizeof(int))
+#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE)
+#define TOTAL_REC_PAGE 20 /* 31 20 */
+
+/* Modes */
+#define TIMED_OUT 0
+#define TEST 1
+#define SENDING_DATA 2
+#define RESENDING_DATA 3
+#define START_CMD 4
+#define EOF_CMD 5
+#define ALL_DONE_CMD 6
+#define NULL_CMD 7
+
+/* machine status */
+#define MACHINE_OK '\1'
+#define NOT_READY '\0'
+
+/* PAGE STATUS */
+#define MISSING '\0'
+#define RECEIVED '\1'
+
+/* MACHINE STATE */
+#define IDLE_STATE 0
+#define GET_DATA_STATE 1
+#define DATA_READY_STATE 2
+
+#define MAX_PAGE_SIZE 64512
+
+int verbose;
+
+#include "rttproto.h"
+
+#endif
diff --git a/src/rttmissings.c b/src/rttmissings.c
new file mode 100644
index 0000000..774e8b2
--- /dev/null
+++ b/src/rttmissings.c
@@ -0,0 +1,93 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+
+char * missingPages = NULL; /* array of flags */
+int nPages;
+
+int init_missingPages(int n)
+{
+ int i;
+
+ nPages = n;
+
+ /* init missingPages flags */
+ if (missingPages != NULL) free(missingPages); /* for second round */
+ missingPages = malloc(sizeof(char) * nPages);
+ for(i=0; i < nPages; ++i)
+ missingPages[i] = MISSING;
+
+ return 0;
+}
+
+int get_total_pages()
+{
+ return nPages;
+}
+
+int missing_pages()
+{
+ int result;
+ int i;
+
+ result = 0;
+
+ for(i=0; i < nPages; ++i)
+ if ((missingPages[i]) == MISSING) ++result;
+ return result;
+}
+
+int is_missing(int page)
+{
+ return (missingPages[page] == MISSING) ? 1 : 0;
+}
+
+void page_received(int page)
+{
+ missingPages[page] = RECEIVED;
+}
+
+int ask_for_missing_page()
+{
+ int i;
+ int n, count;
+
+ n = missing_pages();
+ if (n == 0) {
+ /* send_complaint(EOF_OK, machineID, 0); */
+ return 0; /* nothing is missing */
+ }
+
+ count = 0;
+ for(i=0; i < nPages; ++i) {
+ if ( missingPages[i] == MISSING ) {
+ ++count;
+ send_complaint((count==n) ? LAST_MISSING : MISSING_PAGE, i+1);
+ usleep(DT_PERPAGE);
+ }
+ }
+
+ return 1; /* there is something missing */
+}
+
diff --git a/src/rttpage_reader.c b/src/rttpage_reader.c
new file mode 100644
index 0000000..35940e5
--- /dev/null
+++ b/src/rttpage_reader.c
@@ -0,0 +1,188 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Codes in this file are extraced and modified from page_reader.c
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+
+/* the following is needed on Sun but not on linux */
+#ifdef _SUN
+#include <sys/filio.h>
+#endif
+
+extern char * my_MCAST_ADDR; /* defined in rttcatcher.c */
+extern int my_PORT;
+extern char * my_IFname;
+int machineState;
+int isFirstPage = TRUE;
+
+int recfd;
+#ifndef IPV6
+struct sockaddr_in rec_addr;
+#else
+struct sockaddr_in6 rec_addr;
+#endif
+
+int *mode_ptr; /* change from char to int */
+int *total_bytes_ptr;
+int *current_page_ptr;
+char *data_ptr;
+char rec_buf[PAGE_BUFFSIZE];
+
+void init_page_reader()
+{
+ struct ip_mreq mreq;
+ int rcv_size;
+
+ machineState = IDLE_STATE;
+
+ /* Prepare buffer */
+ mode_ptr = (int*)rec_buf; /* hp: add the cast (int *) */
+ current_page_ptr = (int *)(mode_ptr + 1);
+ total_bytes_ptr = (int *)(current_page_ptr + 1);
+ data_ptr = (char *)(total_bytes_ptr + 1);
+
+ /* Set up receive socket */
+ if (verbose) fprintf(stderr, "setting up receive socket\n");
+ recfd = rec_socket(&rec_addr, my_PORT);
+
+ /* Join the multicast group */
+ if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) {
+ perror("Joining Multicast Group");
+ }
+
+ /* Increase socket receive buffer */
+ rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE;
+ if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){
+ perror("Expanding receive buffer");
+ }
+}
+
+
+/*
+ This is the heart of catcher.
+ It parses the incoming pages and do proper reactions according
+ to the mode (command code) encoded in the first 4 bytes in an UDP page.
+ It returns the mode.
+
+ Note: since rtt is intended to deal with one-to-one machine,
+ the four-state engine as in page_reader is not used.
+*/
+int read_handle_page()
+{
+ #ifndef IPV6
+ struct sockaddr_in return_addr;
+ #else
+ struct sockaddr_in6 return_addr;
+ #endif
+
+ int bytes_read;
+ socklen_t return_len = (socklen_t)sizeof(return_addr);
+ int mode_v, total_bytes_v, current_page_v;
+
+ /* -----------receiving data -----------------*/
+ if (readable(recfd) == 1) { /* there is data coming in */
+ /* get data */
+ bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0,
+ (struct sockaddr *)&return_addr,
+ (socklen_t*) &return_len);
+
+ total_bytes_v = ntohl(*total_bytes_ptr);
+ if (bytes_read != total_bytes_v) return NULL_CMD;
+
+ /* convert from network byte order to host byte order */
+ mode_v = ntohl(*mode_ptr);
+ current_page_v = ntohl(*current_page_ptr);
+
+ if (isFirstPage) {
+ update_complaint_address(&return_addr);
+ isFirstPage = FALSE;
+ }
+
+ /* get init wish list and return address first time only
+ if (firstTime){
+ if (verbose)
+ fprintf(stderr, "Initializing complaint_sender and wish_list\n");
+ init_complaint_sender(&return_addr);
+ firstTime = 0;
+ }
+ */
+
+ /* --- process various commands */
+ switch (mode_v) {
+ case TEST:
+ /* It is just a test packet? */
+ fprintf(stderr, "********** Received test packet **********\n");
+ return mode_v;
+
+ case START_CMD:
+ if (verbose)
+ fprintf(stderr, "Start cmd received ---\n");
+ init_missingPages(current_page_v); /* Here: use current_page for total_pages */
+ send_complaint(START_OK, 0);
+ return mode_v;
+
+ case EOF_CMD:
+ if (verbose)
+ fprintf(stderr, "Check and ask for missing pages ---\n");
+
+ if (ask_for_missing_page()==0) {
+ /*
+ There is no missing page.
+ */
+ send_complaint(EOF_OK, 0);
+ }
+ /*
+ else
+ There are missing pages.
+ Ack has been done in ask_for_missing_page()
+ */
+
+ return mode_v;
+
+ case SENDING_DATA:
+ case RESENDING_DATA:
+ if (verbose){
+ fprintf(stderr, "Got %d bytes from page %d of %d, mode=%d\n",
+ bytes_read - HEAD_SIZE,
+ current_page_v, get_total_pages(), mode_v);
+ }
+
+ /* mode == SENDING_DATA, RESENDING_DATA */
+ page_received(current_page_v);
+ send_complaint(PAGE_RECV, 0);
+
+ /* Yes, we read a page */
+ return mode_v;
+ case ALL_DONE_CMD: /* this is presumably a good machine */
+ default:
+ return mode_v;
+ } /* end of switch */
+ } else {
+ /* No, the read timed out */
+ return TIMED_OUT;
+ }
+}
+
+
diff --git a/src/rttproto.h b/src/rttproto.h
new file mode 100644
index 0000000..6d4cf62
--- /dev/null
+++ b/src/rttproto.h
@@ -0,0 +1,116 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __rttproto_h
+#define __rttproto_h
+
+/* rttsends */
+void init_sends(int n);
+void set_mode(int new_mode);
+int send_page(int page);
+void send_cmd(int code, int machine_id);
+void send_all_done_cmd();
+
+/* rttcomplaints */
+void init_complaints();
+int read_handle_complaint();
+void wait_for_ok(int code);
+void refresh_machine_status();
+int is_it_missing(int page);
+int has_missing_pages();
+void init_missing_page_flag(int n);
+void free_missing_page_flag();
+void refresh_machine_status();
+int get_total_missing_pages();
+void page_sent(int page);
+void pr_missing_pages();
+void do_cntl_c(int signo);
+void send_done_and_pr_msgs();
+void do_badMachines_exit(char * machine, int pid);
+
+/* setup_socket.c */
+void set_delay(int secs, int usecs);
+int readable(int fd);
+#ifndef IPV6
+int complaint_socket(struct sockaddr_in *addr, int port);
+int send_socket(struct sockaddr_in *addr, char * cp, int port);
+int rec_socket(struct sockaddr_in *addr, int port);
+#else
+int rec_socket(struct sockaddr_in6 *addr, int port);
+int send_socket(struct sockaddr_in6 *addr, char * cp, int port);
+int complaint_socket(struct sockaddr_in6 *addr, int port);
+#endif
+
+/* set_mcast.c */
+int mcast_set_if(int sockfd, const char *ifname, u_int ifindex);
+int mcast_set_loop(int sockfd, int onoff);
+int mcast_set_ttl(int sockfd, int val);
+
+/* set_catcher_mcast.c */
+int Mcast_join(int sockfd, const char *mcast_addr,
+ const char *ifname, u_int ifindex);
+void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr);
+
+/* rttcomplaint_sender */
+void send_complaint(int complaint, int page);
+void init_complaint_sender();
+#ifndef IPV6
+void update_complaint_address(struct sockaddr_in *sa);
+#else
+void update_complaint_address(struct sockaddr_in6 *sa);
+#endif
+
+/* rttpage_reader */
+void init_page_reader();
+int read_handle_page();
+
+/* rttmissings */
+int init_missingPages(int n);
+int missing_pages();
+int is_missing(int page);
+void page_received(int page);
+int ask_for_missing_page();
+int get_total_pages();
+
+/* timing */
+void refresh_timer();
+double get_accumulated_time();
+void start_timer();
+void end_timer();
+void update_time_accumulator();
+double get_accumulated_usec();
+void update_rtt_hist(unsigned int rtt);
+void pr_rtt_hist();
+void init_rtt_hist();
+
+/* signal.c */
+typedef void Sigfunc(int); /* for signal handlers */
+Sigfunc * Signal(int signo, Sigfunc *func);
+int Fcntl(int fd, int cmd, int arg);
+int Ioctl(int fd, int request, void *arg);
+void Sigemptyset(sigset_t *set);
+void Sigaddset(sigset_t *set, int signo);
+void Sigprocmask(int how, const sigset_t *set, sigset_t *oset);
+
+#endif
diff --git a/src/rttsends.c b/src/rttsends.c
new file mode 100644
index 0000000..7038e2c
--- /dev/null
+++ b/src/rttsends.c
@@ -0,0 +1,144 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ codes in this file are extracted and modified from sends.c
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+#include <net/if.h>
+#ifdef _SUN
+#include <sys/sockio.h> /* define SIOCGIFADDR */
+#else
+#include <linux/sockios.h>
+#endif
+
+extern char * my_MCAST_ADDR;
+extern int my_PORT;
+extern int my_TTL;
+extern int my_LOOP;
+extern char * my_IFname;
+
+/* buffer for sending (same structure as those in sends.c */
+int *mode_ptr; /* char type would cause alignment problem on Sparc */
+int *current_page_ptr;
+int *total_bytes_ptr;
+char *data_ptr;
+char send_buff[PAGE_BUFFSIZE];
+
+int pageSize;
+
+/* Send socket */
+int send_fd;
+#ifndef IPV6
+struct sockaddr_in send_addr;
+#else
+struct sockaddr_in6 send_addr;
+#endif
+
+/*
+ set_mode sets the caster into a new mode.
+ modes are defined in main.h:
+*/
+void set_mode(int new_mode)
+{
+ *mode_ptr = htonl(new_mode);
+}
+
+
+/* init_sends initializes the send buffer */
+void init_sends(int npagesize)
+{
+ pageSize = (npagesize>PAGE_SIZE) ? PAGE_SIZE : npagesize;
+
+ mode_ptr = (int *)send_buff; /* hp: add (int*) */
+ current_page_ptr = (int *) (mode_ptr + 1);
+ total_bytes_ptr = (int *)(current_page_ptr + 1);
+ data_ptr = (char *)(total_bytes_ptr + 1);
+
+ send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT);
+
+ /******* change MULTICAST_IF ********/
+ if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0)
+ perror("init_sends(): when set MULTICAST_IF\n");
+
+ /* set multicast_ttl such that UDP can go to 2nd subnetwork */
+ if (mcast_set_ttl(send_fd, my_TTL) < 0)
+ perror("init_sends(): when set MULTICAST_TTL\n");
+
+ /* disable multicast_loop such that there is no echo back on master */
+ if (mcast_set_loop(send_fd, my_LOOP) < 0)
+ perror("init_sends(): when set MULTICAST_LOOP\n");
+
+ /* put dummy contents into send (UDP) buffer */
+ memset(data_ptr, 1, PAGE_SIZE);
+}
+
+/*
+ send_buffer will send the buffer with the file information
+ out to the socket connection with the catcher.
+*/
+int send_buffer(int bytes_read)
+{
+ /* Else send the data */
+ if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE,
+ 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) {
+ perror("Sending packet");
+ exit(1);
+ }
+ return (1);
+}
+
+/*
+ send_page takes a page from the current file and controls
+ sending it out the socket to the catcher. It calls send_buffer
+ to do the actuall call to sendto.
+*/
+int send_page(int page)
+{
+ if (verbose>=2) fprintf(stderr, "in send_page\n");
+ *total_bytes_ptr = htonl(pageSize+HEAD_SIZE);
+ *current_page_ptr = htonl(page);
+
+ return send_buffer(pageSize);
+}
+
+
+void send_cmd(int code, int pages)
+{
+ *mode_ptr = htonl(code);
+ *current_page_ptr = htonl(pages);
+ *total_bytes_ptr = htonl(HEAD_SIZE);
+
+ send_buffer(0);
+}
+
+
+void send_all_done_cmd()
+{
+ *mode_ptr = htonl(ALL_DONE_CMD);
+ *current_page_ptr = 0;
+ *total_bytes_ptr = htonl(HEAD_SIZE) ;
+
+ send_buffer(0);
+ if (verbose) fprintf(stderr, "(ALL DONE)\n");
+}
diff --git a/src/sends.c b/src/sends.c
new file mode 100644
index 0000000..61aa07a
--- /dev/null
+++ b/src/sends.c
@@ -0,0 +1,329 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Following the suggestion by Robert Dack <robd@kelman.com>,
+ I added the option to change the default IP address for multicasting
+ and the PORT for flow control. See mrsync.py for the new options.
+
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <net/if.h>
+#ifdef _SUN
+#include <sys/sockio.h> /* define SIOCGIFADDR */
+#else
+#include <linux/sockios.h>
+#endif
+
+extern char * my_MCAST_ADDR; /* defined in multicaster.c */
+extern int my_PORT; /* ditto */
+extern int my_TTL;
+extern int my_LOOP;
+extern char * my_IFname; /* defined in multicaster.c */
+extern int verbose;
+
+extern unsigned int nPages;
+extern unsigned int last_bytes; /* the number of bytes in the last page of a file */
+extern int cur_entry;
+extern int file_changed;
+extern char* cmd_name[];
+
+/* Where have we last sent from? */
+int most_recent_file;
+int most_recent_fd;
+
+/*
+ Send buffer for storing the data and to be transmitted thru UDP
+ The format:
+ (5*sizeof(int) bytes header) + (PAGE_SIZE data area)
+
+ The header has five int_type (4 bytes) int's.
+ (1) mode -- for master to give instructions to the target machines.
+ (2) current file index (starting with 1)
+ (3) current page index (starting with 1) -- gee!
+ (4) bytes to be sent in this page via UPD
+ (5) total number of pages for this file
+
+ data_ptr points to the data area.
+*/
+int *mode_ptr; /* char type would cause alignment problem on Sparc */
+int *current_file_ptr;
+int *current_page_ptr;
+int *bytes_sent_ptr;
+int *total_pages_ptr;
+char *data_ptr;
+char *fill_here;
+char send_buff[PAGE_BUFFSIZE];
+
+/* Send socket */
+int send_fd;
+#ifndef IPV6
+struct sockaddr_in send_addr;
+#else
+struct sockaddr_in6 send_addr;
+#endif
+
+/* for final statistics */
+extern unsigned int total_pages;
+extern off_t total_bytes;
+off_t real_total_bytes;
+unsigned int real_total_pages;
+
+/*
+ set_mode sets the caster into a new mode.
+ modes are defined in main.h:
+*/
+void set_mode(int new_mode)
+{
+ /* 20060323 convert it to network byte order */
+ *mode_ptr = htonl(new_mode);
+}
+
+/* init_sends initializes the send buffer */
+void init_sends()
+{
+ most_recent_file = most_recent_fd = -99999;
+ real_total_bytes = 0;
+ real_total_pages = 0;
+
+ /* pointers for send buffer */
+ mode_ptr = (int *)send_buff;
+ current_file_ptr = (int *)(mode_ptr+1);
+ current_page_ptr = (int *)(current_file_ptr + 1);
+ bytes_sent_ptr = (int *)(current_page_ptr + 1);
+ total_pages_ptr = (int *)(bytes_sent_ptr + 1);
+ data_ptr = (char *)(total_pages_ptr + 1);
+ fill_here = data_ptr;
+
+ /* send socket */
+ send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT);
+
+ /******* change MULTICAST_IF ********/
+ if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0)
+ perror("init_sends(): when set MULTICAST_IF\n");
+
+ /* set multicast_ttl such that UDP can go to 2nd subnetwork */
+ if (mcast_set_ttl(send_fd, my_TTL) < 0)
+ perror("init_sends(): when set MULTICAST_TTL\n");
+
+ /* disable multicast_loop such that there is no echo back on master */
+ if (mcast_set_loop(send_fd, my_LOOP) < 0)
+ perror("init_sends(): when set MULTICAST_LOOP\n");
+}
+
+void clear_send_buf()
+{
+ fill_here = data_ptr;
+}
+
+/*
+ put file contents into send (UDP) buffer
+ return the number of bytes put into the buffer.
+*/
+ssize_t pack_page_for_file(int page)
+{
+ if(verbose>=2)
+ fprintf(stderr, "Sending page %d of file %d\n", page, current_entry());
+
+ /***
+ Adjust the position for reading
+ NOTE:if the type (off_t) is not given, the large file operation
+ would fail.
+ ***/
+ lseek(most_recent_fd, (off_t)PAGE_SIZE * (off_t)(page - 1), SEEK_SET);
+
+ /* read it and put the content into send_buff */
+ /* the max number this read() will return is PAGE_SIZE */
+ return read(most_recent_fd, data_ptr, PAGE_SIZE);
+}
+
+int fexist(int entry)
+{
+ if (entry != most_recent_file) {
+ if (most_recent_fd > 0) close(most_recent_fd); /* make sure we close it */
+
+ if((most_recent_fd = open(getFullname(), O_RDONLY, 0)) < 0){
+ perror(getFullname());
+ }
+ most_recent_file = entry;
+ }
+ return (most_recent_fd >= 0); /* <0 means FAIL */
+}
+
+
+/*
+ send_buffer will send the buffer with the file information
+ out to the socket connection with the catcher.
+ return 0 -- ok, -1 sent failed.
+*/
+int send_buffer(int bytes_read)
+{
+ /* send the data */
+ if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE,
+ 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) {
+ perror("Sending packet");
+ return FAIL;
+ }
+ return SUCCESS;
+}
+
+/*
+ send_page takes a page from the current file and controls
+ sending it out the socket to the catcher. It calls send_buffer
+ to do the actuall call to sendto.
+*/
+int send_page(int page)
+{
+ unsigned int bytes;
+
+ if (verbose>=2) fprintf(stderr, "In send_page()\n");
+
+ if (file_changed) {
+ total_bytes -= ((page<nPages) ? PAGE_SIZE : last_bytes);
+ --total_pages;
+ return FAIL;
+ }
+
+ bytes = (unsigned int) pack_page_for_file(page);
+
+ if ((page < nPages && bytes != PAGE_SIZE) ||
+ (nPages == page && bytes != last_bytes)) {
+ /* file under sync must have been changed during syncing */
+ fprintf(stderr, "Warning: read() error, expected_bytes= %d, real= %d, "
+ "page = %d, nPages = %s%d, for file %s\n",
+ (page<nPages) ? PAGE_SIZE : last_bytes,
+ bytes, page, (current_entry()<0) ? "-" : "", nPages, getFullname());
+ file_changed = TRUE;
+ /* make corrections in total_xxx that we send
+ otherwise the final statistic will be messed up */
+ total_bytes -= ((page<nPages) ? PAGE_SIZE : last_bytes);
+ --total_pages;
+ return FAIL;
+ }
+
+ /* fill in the header data */
+ /* 20060323 -- add htonl so that the codes can work across
+ different machines with either little- or big-endian.
+ So, before we send out ints, we convert them to network-byte order*/
+ *total_pages_ptr = htonl(nPages);
+ *bytes_sent_ptr = htonl(bytes);
+ *current_file_ptr = htonl(cur_entry);
+ *current_page_ptr = htonl(page);
+
+ /* for final statistics */
+ ++real_total_pages;
+ real_total_bytes += bytes;
+
+ if (verbose>=3)
+ fprintf(stderr, "Sending page=%d of %d in file %d of %d\n",
+ page, nPages,
+ cur_entry, total_entries());
+ return send_buffer(bytes);
+}
+
+/*
+ send_test zeroes out the buffers going to the catcher
+ and thereby sends a test packet to the catcher.
+*/
+void send_test()
+{
+ *mode_ptr = htonl(TEST); /* --> network byte order */
+
+ *current_file_ptr = 0;
+ *current_page_ptr = 0;
+ *total_pages_ptr = 0;
+ *bytes_sent_ptr = 0;
+
+ send_buffer(0);
+ fprintf(stderr, "Test packet is sent.\n");
+}
+
+void send_cmd(int code, int machine_id)
+{
+ /*
+ Except for OPEN_FILE_CMD,
+ only the header area in the send_buff gets filled
+ with data.
+ */
+ *mode_ptr = htonl(code); /* --> network byte order */
+ *current_page_ptr = htonl(machine_id); /* -1 being all machines */
+ *current_file_ptr = htonl(cur_entry);
+ *total_pages_ptr = htonl(nPages);
+
+ *bytes_sent_ptr = (code == OPEN_FILE_CMD) ? htonl(fill_here - data_ptr) :0;
+
+ /* do the header in send_buf */
+ send_buffer((code == OPEN_FILE_CMD) ? fill_here - data_ptr : 0);
+
+ /* print message */
+ if (verbose>=2) {
+ fprintf(stderr, "cmd [%s] sent\n", cmd_name[code]);
+ }
+}
+
+void pack_open_file_info()
+{
+ /* prepare udp send_buffer for file info
+ (header) (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0
+ */
+ clear_send_buf();
+ fill_here += fill_in_stat(data_ptr);
+ fill_here += fill_in_filename(fill_here);
+ if (is_softlink() || is_hardlink()) {
+ fill_here += fill_in_linktar(fill_here);
+ }
+}
+
+void send_all_done_cmd()
+{
+ int i;
+ *mode_ptr = htonl(ALL_DONE_CMD); /* --> network byte order */
+
+ *current_file_ptr = 0;
+ *current_page_ptr = 0;
+ *total_pages_ptr = 0;
+ *bytes_sent_ptr = 0;
+
+ for(i=0; i<10; ++i) { /* do it many times, in case network is busy */
+ send_buffer(0);
+ usleep(DT_PERPAGE*10);
+ }
+ /* NOTE: it is still possible that ALL_DONE msg could not
+ be received by targets.
+ For total robustness, some independent checking on targets
+ should be done.
+ */
+ fprintf(stderr, "(ALL DONE)\n");
+}
+
+void my_exit(int good_or_bad)
+{
+ if (send_fd) send_all_done_cmd();
+ exit(good_or_bad);
+}
diff --git a/src/set_catcher_mcast.c b/src/set_catcher_mcast.c
new file mode 100644
index 0000000..9a04017
--- /dev/null
+++ b/src/set_catcher_mcast.c
@@ -0,0 +1,142 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This file collects functions related to setting multicast
+ for multicatcher. They are IPv4 and IPv6 ready.
+ By default, we use IPv4.
+ To use IPv6, we need to specify -DIPv6 in Makefile.
+ The functions in this file are collected from
+ Richard Stevens' Networking bible: Unix Network programming
+
+ I added Mcast_join().
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <net/if.h>
+#ifdef _SUN
+#include <sys/sockio.h> /* define SIOCGIFADDR */
+#else
+#include <linux/sockios.h>
+#endif
+
+#define SA struct sockaddr
+
+int mcast_join(int sockfd, const SA *sa, socklen_t salen,
+ const char *ifname, u_int ifindex)
+{
+ switch (sa->sa_family) {
+ case AF_INET: {
+ struct ip_mreq mreq;
+ struct ifreq ifreq;
+
+ memcpy(&mreq.imr_multiaddr,
+ &((struct sockaddr_in *) sa)->sin_addr,
+ sizeof(struct in_addr));
+
+ if (ifindex > 0) {
+ if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) {
+ errno = ENXIO; /* i/f index not found */
+ return(-1);
+ }
+ goto doioctl;
+ } else if (ifname != NULL) {
+ strncpy(ifreq.ifr_name, ifname, IFNAMSIZ);
+doioctl:
+ if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0)
+ return(-1);
+ memcpy(&mreq.imr_interface,
+ &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr,
+ sizeof(struct in_addr));
+ } else
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+
+ return(setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+ &mreq, sizeof(mreq)));
+ }
+
+#ifdef IPV6
+ case AF_INET6: {
+ struct ipv6_mreq mreq6;
+
+ memcpy(&mreq6.ipv6mr_multiaddr,
+ &((struct sockaddr_in6 *) sa)->sin6_addr,
+ sizeof(struct in6_addr));
+
+ if (ifindex > 0)
+ mreq6.ipv6mr_interface = ifindex;
+ else if (ifname != NULL)
+ if ( (mreq6.ipv6mr_interface = if_nametoindex(ifname)) == 0) {
+ errno = ENXIO; /* i/f name not found */
+ return(-1);
+ }
+ else
+ mreq6.ipv6mr_interface = 0;
+
+ return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
+ &mreq6, sizeof(mreq6)));
+ }
+#endif
+
+ default:
+ errno = EPROTONOSUPPORT;
+ return(-1);
+ }
+}
+
+int Mcast_join(int sockfd, const char *mcast_addr,
+ const char *ifname, u_int ifindex)
+{
+ #ifndef IPV6
+ /* IPv4 */
+ struct sockaddr_in sa;
+ sa.sin_family = AF_INET;
+ inet_pton(AF_INET, mcast_addr, &sa.sin_addr);
+ #else
+ struct sockaddr_in6 sa;
+ sa.sin6_family = AF_INET6;
+ inet_pton(AF_INET6, mcast_addr, &sa.sin6_addr);
+ #endif
+
+ return (mcast_join(sockfd, (struct sockaddr *) &sa, sizeof(sa),
+ ifname, ifindex));
+}
+
+void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr)
+{
+ switch (sa->sa_family) {
+ case AF_INET: {
+ struct sockaddr_in *sin = (struct sockaddr_in *) sa;
+
+ memcpy(&sin->sin_addr, addr, sizeof(struct in_addr));
+ return;
+ }
+
+ #ifdef IPV6
+ case AF_INET6: {
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) sa;
+
+ memcpy(&sin6->sin6_addr, addr, sizeof(struct in6_addr));
+ return;
+ }
+ #endif
+ }
+
+ return;
+}
+
diff --git a/src/set_mcast.c b/src/set_mcast.c
new file mode 100644
index 0000000..aac3d36
--- /dev/null
+++ b/src/set_mcast.c
@@ -0,0 +1,160 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This file collects functions related to setting multicast
+ for multicaster. They are IPv4 and IPv6 ready.
+ By default, we use IPv4.
+ To use IPv6, we need to specify -DIPv6 in Makefile.
+ The functions in this file are collected from
+ Richard Stevens' Networking bible: Unix Network programming
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <net/if.h>
+#ifdef _SUN
+#include <sys/sockio.h> /* define SIOCGIFADDR */
+#else
+#include <linux/sockios.h>
+#endif
+
+#define SA struct sockaddr
+#define MAXSOCKADDR 128 /* max socket address structure size */
+
+int sockfd_to_family(int sockfd)
+{
+ union {
+ struct sockaddr sa;
+ char data[MAXSOCKADDR];
+ } un;
+ socklen_t len;
+
+ len = MAXSOCKADDR;
+ if (getsockname(sockfd, (SA *) un.data, &len) < 0)
+ return(-1);
+ return(un.sa.sa_family);
+}
+
+int mcast_set_if(int sockfd, const char *ifname, u_int ifindex)
+{
+ switch (sockfd_to_family(sockfd)) {
+ case AF_INET: {
+ struct in_addr inaddr;
+ struct ifreq ifreq;
+
+ if (ifindex > 0) {
+ if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) {
+ errno = ENXIO; /* i/f index not found */
+ return(-1);
+ }
+ goto doioctl;
+ } else if (ifname != NULL) {
+ strncpy(ifreq.ifr_name, ifname, IFNAMSIZ);
+doioctl:
+ if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0)
+ return(-1);
+ memcpy(&inaddr,
+ &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr,
+ sizeof(struct in_addr));
+ } else
+ inaddr.s_addr = htonl(INADDR_ANY); /* remove prev. set default */
+
+ return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF,
+ &inaddr, sizeof(struct in_addr)));
+ }
+
+#ifdef IPV6
+ case AF_INET6: {
+ u_int index;
+
+ if ( (index = ifindex) == 0) {
+ if (ifname == NULL) {
+ errno = EINVAL; /* must supply either index or name */
+ return(-1);
+ }
+ if ( (index = if_nametoindex(ifname)) == 0) {
+ errno = ENXIO; /* i/f name not found */
+ return(-1);
+ }
+ }
+ return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_IF,
+ &index, sizeof(index)));
+ }
+#endif
+
+ default:
+ errno = EPROTONOSUPPORT;
+ return(-1);
+ }
+}
+
+int mcast_set_loop(int sockfd, int onoff)
+{
+ switch (sockfd_to_family(sockfd)) {
+ case AF_INET: {
+ u_char flag;
+
+ flag = onoff;
+ return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_LOOP,
+ &flag, sizeof(flag)));
+ }
+
+#ifdef IPV6
+ case AF_INET6: {
+ u_int flag;
+
+ flag = onoff;
+ return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
+ &flag, sizeof(flag)));
+ }
+#endif
+
+ default:
+ errno = EPROTONOSUPPORT;
+ return(-1);
+ }
+}
+/* end mcast_set_loop */
+
+int mcast_set_ttl(int sockfd, int val)
+{
+ switch (sockfd_to_family(sockfd)) {
+ case AF_INET: {
+ u_char ttl;
+
+ ttl = val;
+ return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL,
+ &ttl, sizeof(ttl)));
+ }
+
+#ifdef IPV6
+ case AF_INET6: {
+ int hop;
+
+ hop = val;
+ return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
+ &hop, sizeof(hop)));
+ }
+#endif
+
+ default:
+ errno = EPROTONOSUPPORT;
+ return(-1);
+ }
+}
+
diff --git a/src/setup_socket.c b/src/setup_socket.c
new file mode 100644
index 0000000..3625a2a
--- /dev/null
+++ b/src/setup_socket.c
@@ -0,0 +1,242 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ make it IPv6-ready
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ file name is changed from main.c to setup_socket.c
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+/*
+ This part was based on
+ (1) codes by Aaron Hillegass <aaron@classmax.com>
+ (2) codes in Steven's book 'network programming'
+
+ 200605 change it to make it IPv6 ready
+
+*/
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
+#include <arpa/inet.h> /* inet(3) functions */
+#include <errno.h>
+
+extern int verbose;
+int delay_sec;
+int delay_usec;
+
+
+/* Set up for catcher's complaint socket */
+#ifndef IPV6
+int complaint_socket(struct sockaddr_in *addr, int port)
+#else
+int complaint_socket(struct sockaddr_in6 *addr, int port)
+#endif
+{
+ int fd, sockaddr_len;
+ sa_family_t family;
+ if (verbose>=2) fprintf(stderr, "in send_socket_ip\n");
+
+ #ifndef IPV6
+ /* IPv4 */
+ sockaddr_len = sizeof(struct sockaddr_in);
+ memset(addr, 0, sockaddr_len);
+ addr->sin_family = AF_INET;
+ family = AF_INET;
+ addr->sin_port = htons(port);
+ /*addr->sin_addr.s_addr = ip; this fx in for init process only
+ This ip will be overwritten later after
+ 1st packet is received. */
+ #else
+ /* IPv6 */
+ sockaddr_len = sizeof(struct sockaddr_in6);
+ memset(addr, 0, sockaddr_len);
+ addr->sin6_family = AF_INET6;
+ family = AF_INET6;
+ addr->sin6_port = htons(port);
+ /* addr->sin_addr.s_addr = ip; see comments above */
+ #endif
+
+ if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){
+ perror("Send socket");
+ exit(1);
+ }
+ return fd;
+}
+
+/* Set up mcast send socket for multicaster based on (char*)cp */
+#ifndef IPV6
+int send_socket(struct sockaddr_in *addr, char * cp, int port)
+#else
+int send_socket(struct sockaddr_in6 *addr, char * cp, int port)
+#endif
+{
+ int fd, sockaddr_len;
+ sa_family_t family;
+ char buf[50];
+ if (verbose>=2) fprintf(stderr, "in send_socket\n");
+
+ #ifndef IPV6
+ /* IPv4 */
+ sockaddr_len = sizeof(struct sockaddr_in);
+ memset(addr, 0, sockaddr_len);
+ addr->sin_family = AF_INET;
+ family = AF_INET;
+ addr->sin_port = htons(port);
+ /*addr->sin_addr.s_addr = inet_addr(cp);*/
+ inet_pton(AF_INET, cp, &addr->sin_addr);
+ /* Print out IP address and port */
+ inet_ntop(AF_INET, &addr->sin_addr, buf, 50);
+ #else
+ sockaddr_len = sizeof(struct sockaddr_in6);
+ memset(addr, 0, sockaddr_len);
+ addr->sin6_family = AF_INET6;
+ family = AF_INET6;
+ addr->sin6_port = htons(port);
+ /*addr->sin_addr.s_addr = inet_addr(cp);*/
+ inet_pton(AF_INET6, cp, &addr->sin6_addr);
+ /* Print out IP address and port */
+ inet_ntop(AF_INET6, &addr->sin6_addr, buf, 50);
+ #endif
+
+ if (verbose>=2)
+ fprintf(stderr, "Creating a send socket to %s:%d\n", buf, port);
+
+ if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){
+ perror("Send socket");
+ exit(1);
+ }
+
+ if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){
+ perror("in send_socket(): bind error (need to change MCAST_ADDR)");
+ exit(1);
+ }
+ return fd; /*send_socket_ip(addr, address, port);*/
+}
+
+/* set up socket on the receiving end */
+#ifndef IPV6
+int rec_socket(struct sockaddr_in *addr, int port)
+#else
+int rec_socket(struct sockaddr_in6 *addr, int port)
+#endif
+{
+ int fd, sockaddr_len;
+ sa_family_t family;
+
+ #ifndef IPV6
+ /* IPv4 */
+ sockaddr_len = sizeof(struct sockaddr_in);
+ memset(addr, 0, sockaddr_len);
+ addr->sin_family = AF_INET;
+ family = AF_INET;
+ addr->sin_port = htons(port);
+ addr->sin_addr.s_addr = htonl(INADDR_ANY);
+ #else
+ sockaddr_len = sizeof(struct sockaddr_in6);
+ memset(addr, 0, sockaddr_len);
+ addr->sin6_family = AF_INET6;
+ family = AF_INET6;
+ addr->sin6_port = htons(port);
+ addr->sin6_addr = in6addr_any; /* RS book: page 92 */
+ #endif
+
+ if((fd = socket(family, SOCK_DGRAM, 0)) < 0){
+ perror("Socket create");
+ exit(1);
+ }
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, NULL, 0);
+
+ if (verbose>=2)
+ fprintf(stderr, "Creating receive socket on port %d\n", port);
+
+ /*if ((bind(fd, addr, sizeof(*addr))) < 0){ MOD by RWM: replaced by next line */
+ if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){
+ perror("in rec_socket(): bind error (need to change PORT)");
+ exit(1);
+ }
+ return fd;
+}
+
+/*
+ set the values of two variables to be used by select()
+ in readable().
+*/
+void set_delay(int secs, int usecs)
+{
+ if (verbose>=2) {
+ if (usecs == -1)
+ fprintf(stderr, "Timeout: set to infinite\n");
+ else
+ fprintf(stderr, "Timeout: set to %d sec + %d usec\n", secs, usecs);
+ }
+ delay_sec = secs;
+ delay_usec = usecs;
+}
+
+void get_delay(int * secs, int * usecs)
+{
+ *secs = delay_sec;
+ *usecs= delay_usec;
+}
+
+/*
+ Check if there is an incoming UDP for a certain amount
+ of time period specified in delay_tv..
+
+ Return 'true' if there is.
+ Return 'false' if no valid UDP has arrived within the time period.
+
+ In multicaster, this is used in read_handle_page() right after
+ send_page() is carried out. As a result, read_handle_complaint() waits
+ for any incoming complaints from any target machines within
+ the specified time period. As it is now, no target machine
+ will send back complaints during the transmission of all the
+ pages for a file. Therefore, read_handle_complaint() serves effectively
+ as a time delay between sending of each page.
+*/
+int readable(int fd)
+{
+ struct timeval delay_tv;
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(fd, &rset);
+
+ /*
+ if microsec == -1 wait forever for a packet.
+ This is used in the beginning when multicatcher is just
+ invoked.
+ */
+ if (delay_usec == -1){
+ return(select(fd + 1, &rset, NULL, NULL, NULL));
+ } else {
+ delay_tv.tv_sec = delay_sec;
+ delay_tv.tv_usec = delay_usec;
+ return (select(fd + 1, &rset, NULL, NULL, &delay_tv));
+ }
+}
+
diff --git a/src/signal.c b/src/signal.c
new file mode 100644
index 0000000..5b49b52
--- /dev/null
+++ b/src/signal.c
@@ -0,0 +1,93 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ The code in this file is copied from
+ Richard Stevens' book
+ "UNIX Network Programming" Chap.22.3
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "signal.h"
+
+Sigfunc * signal(int signo, Sigfunc *func)
+{
+ struct sigaction act, oact;
+
+ act.sa_handler = func;
+ sigemptyset(&act.sa_mask);
+ act.sa_flags = 0;
+ if (signo == SIGALRM) {
+#ifdef SA_INTERRUPT
+ act.sa_flags |= SA_INTERRUPT; /* SunOS 4.x */
+#endif
+ } else {
+#ifdef SA_RESTART
+ act.sa_flags |= SA_RESTART; /* SVR4, 44BSD */
+#endif
+ }
+ if (sigaction(signo, &act, &oact) < 0)
+ return(SIG_ERR);
+ return(oact.sa_handler);
+}
+/* end signal */
+
+Sigfunc * Signal(int signo, Sigfunc *func) /* for our signal() function */
+{
+ Sigfunc *sigfunc;
+
+ if ( (sigfunc = signal(signo, func)) == SIG_ERR)
+ perror("signal error");
+ return(sigfunc);
+}
+
+int Fcntl(int fd, int cmd, int arg)
+{
+ int n;
+
+ if ( (n = fcntl(fd, cmd, arg)) == -1)
+ perror("fcntl error");
+ return(n);
+}
+
+int Ioctl(int fd, int request, void *arg)
+{
+ int n;
+
+ if ( (n = ioctl(fd, request, arg)) == -1)
+ perror("ioctl error");
+ return(n); /* streamio of I_LIST returns value */
+}
+
+void Sigemptyset(sigset_t *set)
+{
+ if (sigemptyset(set) == -1)
+ perror("sigemptyset error");
+}
+
+void Sigaddset(sigset_t *set, int signo)
+{
+ if (sigaddset(set, signo) == -1)
+ perror("sigaddset error");
+}
+
+void Sigprocmask(int how, const sigset_t *set, sigset_t *oset)
+{
+ if (sigprocmask(how, set, oset) == -1)
+ perror("sigprocmask error");
+}
diff --git a/src/signal.h b/src/signal.h
new file mode 100644
index 0000000..bf309f8
--- /dev/null
+++ b/src/signal.h
@@ -0,0 +1,32 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include <sys/types.h>
+#include <stdio.h>
+#include <signal.h>
+#include <unistd.h>
+/******* on linux: ioctl() is defined in sys/ioctl.h instead of unistd.h as on SunOS *****/
+#include <sys/ioctl.h>
+#include <fcntl.h>
+
+typedef void Sigfunc(int); /* for signal handlers */
+
diff --git a/src/timing.c b/src/timing.c
new file mode 100644
index 0000000..d4baa15
--- /dev/null
+++ b/src/timing.c
@@ -0,0 +1,109 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include <sys/time.h>
+#include <stdio.h>
+
+#define N 501 /* effectively set the maximum time in rtt_hist to be 500 msec */
+
+extern int verbose;
+
+/* for timing */
+struct timeval tv0, tv1;
+unsigned long usec_acc, sec_acc; /* accumulator of timing */
+unsigned int rtt_hist[N]; /* rtt_hist[i] = count of rtt within (i, i+1) */
+
+void refresh_timer()
+{
+ usec_acc = 0;
+ sec_acc = 0;
+}
+
+void start_timer()
+{
+ struct timezone tz;
+ gettimeofday(&tv0, &tz);
+}
+
+void end_timer()
+{
+ struct timezone tz;
+ gettimeofday(&tv1, &tz); /* end timer -------- */
+}
+
+void update_time_accumulator()
+{
+ if (tv1.tv_usec<tv0.tv_usec) {
+ sec_acc += (tv1.tv_sec - tv0.tv_sec - 1);
+ usec_acc += (1000000 + tv1.tv_usec - tv0.tv_usec);
+ } else {
+ sec_acc += (tv1.tv_sec - tv0.tv_sec);
+ usec_acc += (tv1.tv_usec - tv0.tv_usec);
+ }
+}
+
+double get_accumulated_time()
+{
+ double sec = sec_acc;
+ sec += (usec_acc / 1e6);
+ return sec;
+}
+
+double get_accumulated_usec()
+{
+ double usec = usec_acc;
+ usec += (sec_acc*1e6);
+ return usec;
+}
+
+void init_rtt_hist()
+{
+ int i;
+ for(i=0; i<N; ++i) rtt_hist[i] = 0;
+}
+
+void update_rtt_hist(unsigned int rtt)
+{
+ unsigned int index;
+ index = rtt / 1000;
+ if (index>(N-2)) index = N-1;
+ rtt_hist[index]++;
+}
+
+void pr_rtt_hist()
+{
+ int i;
+ fprintf(stderr, "rtt histogram\n");
+ fprintf(stderr, "msec counts\n");
+ fprintf(stderr, "---- --------\n");
+ for(i=0; i<N; ++i) {
+ if (verbose<=1 && i>10) continue;
+ if (rtt_hist[i] != 0) {
+ fprintf(stderr, "%4d %u\n", i, rtt_hist[i]);
+ }
+ }
+}
+
+unsigned int pages_wo_ack()
+{
+ return rtt_hist[N-1];
+}
diff --git a/src/trFilelist.c b/src/trFilelist.c
new file mode 100644
index 0000000..8f65796
--- /dev/null
+++ b/src/trFilelist.c
@@ -0,0 +1,449 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+/***this code parse the synclist file generated by rsync in dry-run mode.
+ The minimum options for rsync is : -avW --dry-run --delete
+ e.g.
+ /usr/local/bin/rsync --rsync-path=/usr/local/bin/rsync
+ -avW --dry-run --delete
+ /src/path/ dest_machine:/target/path/ > output 2>&1
+A typical output of rsync may look like this:
+Client is very old version of rsync, upgrade recommended.
+building file list ... done
+xyz -> ./sub1/xyz
+file1
+fn
+fn1
+j
+sub1/hardlink_to_file1
+sub1/path/testfile
+sub1/xyz
+sent 337 bytes read 44 bytes 254.00 bytes/sec
+total size is 320751046 speedup is 841866.26
+
+This code does the following three things.
+(1) skip the lines before 'done' and after 'wrote'
+(2) output all directories and file_path
+ e.g
+ for an entry: sub1/path/testfile, the output is
+ sub1
+ sub1/path
+ sub1/path/testfile
+(3) xyz -> ./sub1/xyz
+ the output is
+ xyz
+(4) If file1 and sub1/hardlink are hardlinked
+ the output is
+ file1
+ sub1/hardlink file1
+
+For the example output above, the output of this code is:
+
+xyz
+file1
+fn
+fn1
+j
+sub1
+sub1/hardlink_to_file1 file1
+sub1/path
+sub1/path/testfile
+sub1/xyz
+
+***/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <string.h>
+#include <limits.h> /* to define PATH_MAX */
+#include <sys/stat.h>
+
+#define TRUE 1
+#define FALSE 0
+
+struct string_list {
+ int capacity;
+ char ** endp;
+ char ** str;
+};
+
+void init_string_list(struct string_list * str_ptr, int n)
+{
+ str_ptr->str = malloc(n * sizeof(void*));
+ str_ptr->capacity = n;
+ str_ptr->endp = str_ptr->str;
+}
+
+void grow_string_list(struct string_list * slp)
+{
+ int new_capacity = 2 * slp->capacity;
+ char ** old_ptrs = slp->str;
+ char ** new_ptrs;
+ char ** newp = malloc(new_capacity * sizeof(void *));
+ new_ptrs = newp;
+
+ while (old_ptrs < slp->endp) {
+ *new_ptrs++ = *old_ptrs++;
+ }
+
+ free(slp->str);
+ slp->str = newp;
+ slp->endp= new_ptrs;
+ slp->capacity = new_capacity;
+}
+
+void append_string_list(char * str, struct string_list * slp)
+{
+ if (slp->endp - slp->str == slp->capacity) grow_string_list(slp);
+ *slp->endp = strdup(str);
+ (slp->endp)++;
+}
+
+/*************** change to return index ****/
+int find_string(char * str, struct string_list * slp)
+{
+ /* find if str is in the list */
+ int i;
+ int n = slp->endp - slp->str;
+
+ for(i=0; i<n; ++i) {
+ if (strcmp((slp->str)[i], str)==0) return i;
+ }
+ return -1;
+}
+
+/* find if a string in string-list is a sub-string of str */
+int has_sub_string(char * str, struct string_list *slp)
+{
+ int i;
+ int n = slp->endp - slp->str;
+
+ for(i=0; i<n; ++i) {
+ if (strncmp(str, (slp->str)[i], strlen((slp->str)[i]))==0) return i;
+ }
+ return -1;
+}
+
+/* find if the str is a substr of those in slp */
+int has_newdir(char *str, struct string_list *slp)
+{
+ int i;
+ int n = slp->endp - slp->str;
+
+ for(i=0; i<n; ++i) {
+ if (strncmp((slp->str)[i],str, strlen(str))==0) return i;
+ }
+ return -1;
+}
+
+struct uint_list {
+ int capacity;
+ unsigned int * endp;
+ unsigned int * d;
+};
+
+void init_uint_list(struct uint_list * uil_ptr, int n)
+{
+ uil_ptr->d = malloc(n * sizeof(unsigned int));
+ uil_ptr->capacity = n;
+ uil_ptr->endp = uil_ptr->d;
+}
+
+void grow_uint_list(struct uint_list * uilp)
+{
+ int new_capacity = 2 * uilp->capacity;
+ unsigned int * old_ptrs = uilp->d;
+ unsigned int * new_ptrs;
+ unsigned int * newp = malloc(new_capacity * sizeof(unsigned int));
+ new_ptrs = newp;
+
+ while (old_ptrs < uilp->endp) {
+ *new_ptrs++ = *old_ptrs++;
+ }
+
+ free(uilp->d);
+ uilp->d = newp;
+ uilp->endp= new_ptrs;
+ uilp->capacity = new_capacity;
+}
+
+void append_uint_list(unsigned int data, struct uint_list * uilp)
+{
+ if (uilp->endp - uilp->d == uilp->capacity) grow_uint_list(uilp);
+ *uilp->endp = data;
+ (uilp->endp)++;
+}
+/*************** change to return index ****/
+int find_unit(unsigned int data, struct uint_list * uilp)
+{
+ /* find if data is in the list */
+ int i;
+ int n = uilp->endp - uilp->d;
+
+ for(i=0; i<n; ++i) {
+ if ((uilp->d)[i] == data) return i;
+ }
+ return -1;
+}
+
+struct string_list file_list;
+struct uint_list ino_list;
+struct string_list dir_list;
+struct string_list softlink_list; /* for (a) */
+struct string_list newdir_list; /* for (b) */
+
+void strip(char * str)
+{
+ /* remove trailing \n and spaces */
+ char *pt;
+ char *pc = &str[strlen(str)-1];
+ while (*pc == ' ' || *pc == '\n') *(pc--) = '\0';
+ /* 20080317 remove leading spaces */
+ pt = pc = &str[0];
+ while (*pc == ' ') ++pc;
+ if (pc != pt) {
+ while (*pc != '\0') *pt++ = *pc++;
+ *pt = '\0';
+ }
+}
+
+void output_subs(char * str)
+{
+ return; /*************************** testing ***************/
+ /* to do (2) indicated in the above */
+ /********
+ char * pc;
+ char subs[PATH_MAX];
+ pc = strstr(str, "/");
+ if (!pc) return;
+
+ while (pc) {
+ strncpy(subs, str, pc-str);
+ subs[pc-str] = '\0';
+ if (find_string(subs, &dir_list)<0) {
+ printf("%s\n", subs);
+ append_string_list(subs, &dir_list);
+ }
+ pc = strstr(pc+1, "/");
+ }
+ ************/
+}
+
+/*** (a)
+ get those softlinks that points to a directory
+ this is to deal with the following scenario
+ previous structure
+ dir_path (a directory)
+ db (a directory)
+
+ newly updated structure on master
+ dir_path -> db
+ db
+
+ rsync --dry-run generates
+ dir_path -> db [a link is done on target]
+ deleting dir_path/sub/filename1 [wrong file gets removed ]
+ deleting dir_path/sub/filename2...
+
+ file_operations.c does this when dir_path -> db is due
+ delete dir_path (rm -rf)
+ make the softlink
+ But then the following delete will have undesired deletion.
+
+ ------------------------------------------------------------
+
+ (b)
+ t0 name -> xyz name -> xyz (target)
+ t1 name/ name -> xyz
+
+ rsync generates
+ name/ update_directory() won't have effect
+ name/f1 delivered to wrong place
+ name/f2
+ deleting name too late
+ ** the deletion should be done before not after.
+ For now, I will fail this code for this situation.
+
+***/
+void get_dir_softlinks(char *filename, char * basedir) {
+ FILE * fd;
+ char line[PATH_MAX];
+ struct stat st;
+
+ if ((fd = fopen(filename, "r")) == NULL) {
+ fprintf(stderr, "Cannot open file -- %s \n", filename);
+ exit(-1);
+ }
+
+ while (1) { /* for each line in the file */
+ char *pc;
+ char fn[PATH_MAX];
+
+ if (fgets(line, PATH_MAX, fd)==NULL) break;
+ strip(line);
+ if (strlen(line) == 0) continue; /* skip blank line */
+
+ /* the softlink case is indicated by -> */
+ pc= strstr(line, " -> ");
+ if (pc) { /* it is a softlink */
+ *pc = '\0';
+ /* check if it is a directory */
+ sprintf(fn, "%s/%s", basedir, line);
+
+ /* check if the link-target is a directory */
+ if (stat(fn, &st)<0) continue; /* We skip this bad entry - no longer exist */
+
+ if (S_ISDIR(st.st_mode)) {
+ append_string_list(line, &softlink_list);
+ }
+ } else { /* not a softlink --> find if it is a directory */
+ /* find a line without ' ' and with trailing '/' */
+ pc = strstr(line, " "); /* the first space */
+ if (!pc) {
+ char * plast = &line[0] + strlen(line) - 1;
+ if (*plast == '/') {
+ append_string_list(line, &newdir_list);
+ }
+ }
+ }
+ }
+
+ fclose(fd);
+}
+
+
+int main(int argc, char * argv[])
+{
+ char * filename;
+ char * basedir;
+ FILE *fd;
+ char line[PATH_MAX];
+
+ if (argc < 3) {
+ fprintf(stderr, "Usage: trFilelist synclist_filename basedir\n");
+ exit(-1);
+ }
+
+ filename = argv[1];
+ basedir = argv[2];
+
+ init_string_list(&file_list, 10);
+ init_uint_list(&ino_list, 10);
+ init_string_list(&dir_list, 100);
+ init_string_list(&softlink_list, 10);
+ init_string_list(&newdir_list, 100);
+
+ get_dir_softlinks(filename, basedir);
+
+ if ((fd = fopen(filename, "r")) == NULL) {
+ fprintf(stderr, "Cannot open file -- %s \n", filename);
+ return -1;
+ }
+
+ while (1) { /* for each line in the file */
+ char *pc;
+ char fn[PATH_MAX];
+ struct stat st;
+ int newdir_flag;
+
+ if (fgets(line, PATH_MAX, fd)==NULL) break;
+ strip(line);
+ if (strlen(line) == 0) continue; /* skip blank line */
+ if (strcmp(line, ".")==0) continue;
+ if (strcmp(line, "./")==0) continue;
+
+ /* first we look for deleting entry */
+ if (strncmp(line, "deleting ", 9)==0) {
+ /* deleting (directory) file_path */
+ char * p1, *p2, *pf;
+
+ p1 = strstr(line, " "); /* the first space */
+ p2 = strstr(p1+1, " "); /* deleting directory filepath * 20070912 this is old */
+ pf = (p2) ? p2+1 : p1+1;/* it's always p1+1 */
+
+ newdir_flag = has_newdir(pf, &newdir_list);
+
+ if ((has_sub_string(pf, &softlink_list)<0) && newdir_flag<0) {
+ /* see comments above get_dir_softlinks() */
+ printf("deleting %s\n", pf);
+ } else if (newdir_flag>=0) { /* temporary action */
+ /*** we can simply skip this block later. 20070912 ***/
+ /***/
+ fprintf(stderr, "CRITICAL ERROR: An old softlink has been changed to a directory!\n");
+ fprintf(stderr, " For now, we crash this code for human intervention\n");
+ fprintf(stderr, " line= %s\n", line);
+ exit(-1);
+ /***/
+ }
+
+ continue;
+ }
+
+ /* the softlink case is indicated by -> */
+ pc= strstr(line, " -> ");
+ if (pc) {
+ *pc = '\0';
+ output_subs(line);
+ printf("%s\n", line);
+ continue;
+ }
+
+ /* if rsync's -H is turned on, the output may contain
+ file => tar_hardlink_file (relative address)
+ */
+ pc= strstr(line, " => ");
+ if (pc) {
+ *pc = '\0';
+ output_subs(line);
+ printf("%s %s\n", line, pc+4);
+ continue;
+ }
+
+ /* the rest of the entries should be valid paths */
+ sprintf(fn, "%s/%s", basedir, line);
+ if (lstat(fn, &st)<0) continue; /* We skip this bad entry -
+ (1) the header and tail lines
+ (2) perhaps the file no longer exists */
+
+ /* is this a hardlink? */
+ if (st.st_nlink > 1) {
+ int index;
+ output_subs(line);
+ if ((index = find_unit((unsigned int)st.st_ino, &ino_list))<0) {
+ append_uint_list((unsigned int)st.st_ino, &ino_list);
+ append_string_list(line, &file_list); /* relative path */
+ printf("%s\n", line);
+ } else {
+ printf("%s %s\n", line, file_list.str[index]);
+ }
+ continue;
+ }
+
+ /* all others */
+ output_subs(line);
+ printf("%s\n", line);
+ } /* end of one line */
+
+ fclose(fd);
+ return 0;
+}