aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--GPL.License280
-rw-r--r--Makefile70
-rw-r--r--Makefile.Sun71
-rw-r--r--README302
-rw-r--r--README.more353
-rw-r--r--backup.c108
-rwxr-xr-xcmdToTarget.py43
-rw-r--r--complaint_sender.c158
-rw-r--r--complaints.c579
-rw-r--r--file_operations.c800
-rw-r--r--global.c35
-rw-r--r--id_map.c74
-rw-r--r--main.h189
-rwxr-xr-xmrsync.py284
-rw-r--r--mrsync_config.py67
-rw-r--r--multicaster.c582
-rw-r--r--multicatcher.c181
-rw-r--r--page_reader.c426
-rw-r--r--parse_synclist.c320
-rw-r--r--proto.h182
-rw-r--r--rtt.c258
-rw-r--r--rttcatcher.c118
-rw-r--r--rttcomplaint_sender.c103
-rw-r--r--rttcomplaints.c270
-rw-r--r--rttmain.h126
-rw-r--r--rttmissings.c93
-rw-r--r--rttpage_reader.c188
-rw-r--r--rttproto.h116
-rw-r--r--rttsends.c144
-rw-r--r--sends.c329
-rw-r--r--set_catcher_mcast.c142
-rw-r--r--set_mcast.c160
-rw-r--r--setup_socket.c242
-rw-r--r--signal.c93
-rw-r--r--signal.h32
-rw-r--r--timing.c109
-rw-r--r--trFilelist.c449
37 files changed, 8076 insertions, 0 deletions
diff --git a/GPL.License b/GPL.License
new file mode 100644
index 0000000..960fe74
--- /dev/null
+++ b/GPL.License
@@ -0,0 +1,280 @@
+ GNU GENERAL PUBLIC LICENSE
+ Version 2, June 1991
+
+ Copyright (C) 1989, 1991 Free Software Foundation, Inc.
+ 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The licenses for most software are designed to take away your
+freedom to share and change it. By contrast, the GNU General Public
+License is intended to guarantee your freedom to share and change free
+software--to make sure the software is free for all its users. This
+General Public License applies to most of the Free Software
+Foundation's software and to any other program whose authors commit to
+using it. (Some other Free Software Foundation software is covered by
+the GNU Library General Public License instead.) You can apply it to
+your programs, too.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+this service if you wish), that you receive source code or can get it
+if you want it, that you can change the software or use pieces of it
+in new free programs; and that you know you can do these things.
+
+ To protect your rights, we need to make restrictions that forbid
+anyone to deny you these rights or to ask you to surrender the rights.
+These restrictions translate to certain responsibilities for you if you
+distribute copies of the software, or if you modify it.
+
+ For example, if you distribute copies of such a program, whether
+gratis or for a fee, you must give the recipients all the rights that
+you have. You must make sure that they, too, receive or can get the
+source code. And you must show them these terms so they know their
+rights.
+
+ We protect your rights with two steps: (1) copyright the software, and
+(2) offer you this license which gives you legal permission to copy,
+distribute and/or modify the software.
+
+ Also, for each author's protection and ours, we want to make certain
+that everyone understands that there is no warranty for this free
+software. If the software is modified by someone else and passed on, we
+want its recipients to know that what they have is not the original, so
+that any problems introduced by others will not reflect on the original
+authors' reputations.
+
+ Finally, any free program is threatened constantly by software
+patents. We wish to avoid the danger that redistributors of a free
+program will individually obtain patent licenses, in effect making the
+program proprietary. To prevent this, we have made it clear that any
+patent must be licensed for everyone's free use or not licensed at all.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+
+ GNU GENERAL PUBLIC LICENSE
+ TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+ 0. This License applies to any program or other work which contains
+a notice placed by the copyright holder saying it may be distributed
+under the terms of this General Public License. The "Program", below,
+refers to any such program or work, and a "work based on the Program"
+means either the Program or any derivative work under copyright law:
+that is to say, a work containing the Program or a portion of it,
+either verbatim or with modifications and/or translated into another
+language. (Hereinafter, translation is included without limitation in
+the term "modification".) Each licensee is addressed as "you".
+
+Activities other than copying, distribution and modification are not
+covered by this License; they are outside its scope. The act of
+running the Program is not restricted, and the output from the Program
+is covered only if its contents constitute a work based on the
+Program (independent of having been made by running the Program).
+Whether that is true depends on what the Program does.
+
+ 1. You may copy and distribute verbatim copies of the Program's
+source code as you receive it, in any medium, provided that you
+conspicuously and appropriately publish on each copy an appropriate
+copyright notice and disclaimer of warranty; keep intact all the
+notices that refer to this License and to the absence of any warranty;
+and give any other recipients of the Program a copy of this License
+along with the Program.
+
+You may charge a fee for the physical act of transferring a copy, and
+you may at your option offer warranty protection in exchange for a fee.
+
+ 2. You may modify your copy or copies of the Program or any portion
+of it, thus forming a work based on the Program, and copy and
+distribute such modifications or work under the terms of Section 1
+above, provided that you also meet all of these conditions:
+
+ a) You must cause the modified files to carry prominent notices
+ stating that you changed the files and the date of any change.
+
+ b) You must cause any work that you distribute or publish, that in
+ whole or in part contains or is derived from the Program or any
+ part thereof, to be licensed as a whole at no charge to all third
+ parties under the terms of this License.
+
+ c) If the modified program normally reads commands interactively
+ when run, you must cause it, when started running for such
+ interactive use in the most ordinary way, to print or display an
+ announcement including an appropriate copyright notice and a
+ notice that there is no warranty (or else, saying that you provide
+ a warranty) and that users may redistribute the program under
+ these conditions, and telling the user how to view a copy of this
+ License. (Exception: if the Program itself is interactive but
+ does not normally print such an announcement, your work based on
+ the Program is not required to print an announcement.)
+
+These requirements apply to the modified work as a whole. If
+identifiable sections of that work are not derived from the Program,
+and can be reasonably considered independent and separate works in
+themselves, then this License, and its terms, do not apply to those
+sections when you distribute them as separate works. But when you
+distribute the same sections as part of a whole which is a work based
+on the Program, the distribution of the whole must be on the terms of
+this License, whose permissions for other licensees extend to the
+entire whole, and thus to each and every part regardless of who wrote it.
+
+Thus, it is not the intent of this section to claim rights or contest
+your rights to work written entirely by you; rather, the intent is to
+exercise the right to control the distribution of derivative or
+collective works based on the Program.
+
+In addition, mere aggregation of another work not based on the Program
+with the Program (or with a work based on the Program) on a volume of
+a storage or distribution medium does not bring the other work under
+the scope of this License.
+
+ 3. You may copy and distribute the Program (or a work based on it,
+under Section 2) in object code or executable form under the terms of
+Sections 1 and 2 above provided that you also do one of the following:
+
+ a) Accompany it with the complete corresponding machine-readable
+ source code, which must be distributed under the terms of Sections
+ 1 and 2 above on a medium customarily used for software interchange; or,
+
+ b) Accompany it with a written offer, valid for at least three
+ years, to give any third party, for a charge no more than your
+ cost of physically performing source distribution, a complete
+ machine-readable copy of the corresponding source code, to be
+ distributed under the terms of Sections 1 and 2 above on a medium
+ customarily used for software interchange; or,
+
+ c) Accompany it with the information you received as to the offer
+ to distribute corresponding source code. (This alternative is
+ allowed only for noncommercial distribution and only if you
+ received the program in object code or executable form with such
+ an offer, in accord with Subsection b above.)
+
+The source code for a work means the preferred form of the work for
+making modifications to it. For an executable work, complete source
+code means all the source code for all modules it contains, plus any
+associated interface definition files, plus the scripts used to
+control compilation and installation of the executable. However, as a
+special exception, the source code distributed need not include
+anything that is normally distributed (in either source or binary
+form) with the major components (compiler, kernel, and so on) of the
+operating system on which the executable runs, unless that component
+itself accompanies the executable.
+
+If distribution of executable or object code is made by offering
+access to copy from a designated place, then offering equivalent
+access to copy the source code from the same place counts as
+distribution of the source code, even though third parties are not
+compelled to copy the source along with the object code.
+
+ 4. You may not copy, modify, sublicense, or distribute the Program
+except as expressly provided under this License. Any attempt
+otherwise to copy, modify, sublicense or distribute the Program is
+void, and will automatically terminate your rights under this License.
+However, parties who have received copies, or rights, from you under
+this License will not have their licenses terminated so long as such
+parties remain in full compliance.
+
+ 5. You are not required to accept this License, since you have not
+signed it. However, nothing else grants you permission to modify or
+distribute the Program or its derivative works. These actions are
+prohibited by law if you do not accept this License. Therefore, by
+modifying or distributing the Program (or any work based on the
+Program), you indicate your acceptance of this License to do so, and
+all its terms and conditions for copying, distributing or modifying
+the Program or works based on it.
+
+ 6. Each time you redistribute the Program (or any work based on the
+Program), the recipient automatically receives a license from the
+original licensor to copy, distribute or modify the Program subject to
+these terms and conditions. You may not impose any further
+restrictions on the recipients' exercise of the rights granted herein.
+You are not responsible for enforcing compliance by third parties to
+this License.
+
+ 7. If, as a consequence of a court judgment or allegation of patent
+infringement or for any other reason (not limited to patent issues),
+conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot
+distribute so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you
+may not distribute the Program at all. For example, if a patent
+license would not permit royalty-free redistribution of the Program by
+all those who receive copies directly or indirectly through you, then
+the only way you could satisfy both it and this License would be to
+refrain entirely from distribution of the Program.
+
+If any portion of this section is held invalid or unenforceable under
+any particular circumstance, the balance of the section is intended to
+apply and the section as a whole is intended to apply in other
+circumstances.
+
+It is not the purpose of this section to induce you to infringe any
+patents or other property right claims or to contest validity of any
+such claims; this section has the sole purpose of protecting the
+integrity of the free software distribution system, which is
+implemented by public license practices. Many people have made
+generous contributions to the wide range of software distributed
+through that system in reliance on consistent application of that
+system; it is up to the author/donor to decide if he or she is willing
+to distribute software through any other system and a licensee cannot
+impose that choice.
+
+This section is intended to make thoroughly clear what is believed to
+be a consequence of the rest of this License.
+
+ 8. If the distribution and/or use of the Program is restricted in
+certain countries either by patents or by copyrighted interfaces, the
+original copyright holder who places the Program under this License
+may add an explicit geographical distribution limitation excluding
+those countries, so that distribution is permitted only in or among
+countries not thus excluded. In such case, this License incorporates
+the limitation as if written in the body of this License.
+
+ 9. The Free Software Foundation may publish revised and/or new versions
+of the General Public License from time to time. Such new versions will
+be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+Each version is given a distinguishing version number. If the Program
+specifies a version number of this License which applies to it and "any
+later version", you have the option of following the terms and conditions
+either of that version or of any later version published by the Free
+Software Foundation. If the Program does not specify a version number of
+this License, you may choose any version ever published by the Free Software
+Foundation.
+
+ 10. If you wish to incorporate parts of the Program into other free
+programs whose distribution conditions are different, write to the author
+to ask for permission. For software which is copyrighted by the Free
+Software Foundation, write to the Free Software Foundation; we sometimes
+make exceptions for this. Our decision will be guided by the two goals
+of preserving the free status of all derivatives of our free software and
+of promoting the sharing and reuse of software generally.
+
+ NO WARRANTY
+
+ 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
+FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
+OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
+PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED
+OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS
+TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE
+PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING,
+REPAIR OR CORRECTION.
+
+ 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
+REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES,
+INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING
+OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED
+TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY
+YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER
+PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGES.
+
+ END OF TERMS AND CONDITIONS
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..9de65aa
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,70 @@
+# ----- start of system dependent section -----
+
+INSTALL = cp -p
+
+SUNFLAG = # -D_SUN on Solaris machines
+IPV6FLAG = # -DIPV6 for IPv6
+DEBUG = # -g -ggdb
+CFLAGS = -O ${DEBUG} -Wall ${SUNFLAG} ${IPV6FLAG} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64
+#LIBS = -lsocket # for Solaris
+LIBS = # there is no special lib needed, unless your system put the lib in non-standard place
+
+# The directory to install mrsync and others in.
+bindir = /usr/local/bin
+
+# ----- end of system dependent section -------
+
+CLEANFILES = *.o *~
+
+PROGS = multicaster multicatcher rtt rttcatcher trFilelist
+SCR = mrsync.py mrsync_config.py cmdToTarget.py
+OBJ1 = multicaster.o multicatcher.o \
+ parse_synclist.o sends.o complaints.o \
+ complaint_sender.o page_reader.o file_operations.o backup.o \
+ set_catcher_mcast.o set_mcast.o
+
+OBJ4 = rtt.o rttsends.o rttcomplaints.o \
+ rttcatcher.o rttpage_reader.o rttcomplaint_sender.o rttmissings.o
+
+all: ${PROGS}
+
+install: ${PROGS}
+ ${INSTALL} ${PROGS} ${SCR} ${bindir}
+
+# common files
+signal.o: signal.h
+
+# multicasting
+${OBJ1}: main.h proto.h
+
+multicaster: multicaster.o global.o setup_socket.o set_mcast.o \
+ parse_synclist.o \
+ sends.o complaints.o backup.o \
+ timing.o signal.o id_map.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+multicatcher: multicatcher.o global.o setup_socket.o set_catcher_mcast.o \
+ page_reader.o complaint_sender.o \
+ file_operations.o signal.o timing.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+# for rtt and rttcatcher
+${OBJ4}: rttmain.h rttproto.h
+
+rtt: rtt.o setup_socket.o set_mcast.o \
+ rttsends.o rttcomplaints.o timing.o signal.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+rttcatcher: rttcatcher.o setup_socket.o set_catcher_mcast.o \
+ rttpage_reader.o rttcomplaint_sender.o rttmissings.o \
+ signal.o timing.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+# misc
+trFilelist: trFilelist.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+# to clean up
+clean:
+ rm -f ${PROGS} ${CLEANFILES}
+
diff --git a/Makefile.Sun b/Makefile.Sun
new file mode 100644
index 0000000..8fe57da
--- /dev/null
+++ b/Makefile.Sun
@@ -0,0 +1,71 @@
+# ----- start of system dependent section -----
+
+INSTALL = cp -p
+
+SUNFLAG = -D_SUN # -D_SUN on Solaris machines
+IPV6FLAG = # -DIPV6 for IPv6
+DEBUG = # -g -ggdb
+CC = gcc-2.95.3.ren # 32-bit compiler
+CFLAGS = -O ${DEBUG} -Wall ${SUNFLAG} ${IPV6FLAG} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64
+LIBS = -lsocket # for Solaris
+#LIBS = /usr/local/mtools/nova/lib/lea.Linux.o # for monster clusters (32-bit lib)
+
+# The directory to install mrsync and others in.
+bindir = /usr/local/bin
+
+# ----- end of system dependent section -------
+
+CLEANFILES = *.o *~
+
+PROGS = multicaster multicatcher rtt rttcatcher trFilelist
+SCR = mrsync.py mrsync_config.py cmdToTarget.py
+OBJ1 = multicaster.o multicatcher.o \
+ parse_synclist.o sends.o complaints.o \
+ complaint_sender.o page_reader.o file_operations.o backup.o \
+ set_catcher_mcast.o set_mcast.o
+
+OBJ4 = rtt.o rttsends.o rttcomplaints.o \
+ rttcatcher.o rttpage_reader.o rttcomplaint_sender.o rttmissings.o
+
+all: ${PROGS}
+
+install: ${PROGS}
+ ${INSTALL} ${PROGS} ${SCR} ${bindir}
+
+# common files
+signal.o: signal.h
+
+# multicasting
+${OBJ1}: main.h proto.h
+
+multicaster: multicaster.o global.o setup_socket.o set_mcast.o \
+ parse_synclist.o \
+ sends.o complaints.o backup.o \
+ timing.o signal.o id_map.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+multicatcher: multicatcher.o global.o setup_socket.o set_catcher_mcast.o \
+ page_reader.o complaint_sender.o \
+ file_operations.o signal.o timing.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+# for rtt and rttcatcher
+${OBJ4}: rttmain.h rttproto.h
+
+rtt: rtt.o setup_socket.o set_mcast.o \
+ rttsends.o rttcomplaints.o timing.o signal.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+rttcatcher: rttcatcher.o setup_socket.o set_catcher_mcast.o \
+ rttpage_reader.o rttcomplaint_sender.o rttmissings.o \
+ signal.o timing.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+# misc
+trFilelist: trFilelist.o
+ ${CC} ${CFLAGS} -o $@ $^ ${LIBS}
+
+# to clean up
+clean:
+ rm -f ${PROGS} ${CLEANFILES}
+
diff --git a/README b/README
new file mode 100644
index 0000000..d665370
--- /dev/null
+++ b/README
@@ -0,0 +1,302 @@
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+20060416 version 3.0
+ This is a major update from previous version 2.1. New features include:
+ -- large file support
+ multicaster can now transfer file whose size is larger than 2**31-1.
+ -- platform independence (between linux, unix)
+ We have tested this independence from a Sun to a Linux machine.
+ -- catching slow machine as the feedback monitor dynamically
+ This significantly reduces the number of occurence of SIT-OUT-RECEIVING-FILE cases.
+ -- removing meta-file-info
+ Since our basic algorithm is different from that of Aaron's, the meta-file-info
+ is redundant for in our case.
+ -- backup feature (as in rsync)
+ If the target machines are NFS data servers, backup feature is absolutely
+ necessary. The feature is implemented in the low level multica* codes,
+ I will incorporate it into mrsync.py in the next month or so.
+ -- mcast IP address and port options
+ They are now tested and are working.
+ This allows multiple multicasting sessions running simultaneously.
+ -- code cleanup in both mrsync.py and *.c
+ -- code fixing for 64bit arch;
+ tested on Debian 64 bit arch by Nicolas Marot in France
+ -- logic flaw which under certain condition
+ caused premature dropout due to
+ unsuccessful EOF, CLOSE_FILE
+ and caused unwanranted SIT-OUT case
+ -- code provision for IPv6 (but not tested yet)
+version 3.1.1
+ -- minor bug fixes.
+ -- add checking in multicatcher to check if the
+ system is ready for write.
+ -- codes for IPv6 are ready (but not tested)
+ IPv4 is tested ok.
+version 3.2.0
+ -- monitor change improvement
+ -- handshake improvement (e.g. seq #)
+ -- if one machine skips a file, all will NOT close()
+version 4.0.0
+ -- previously, missing page report is sent back one page
+ at a time. I change it to ~60K pages at a time. This
+ drastically reduces the network traffic.
+ Performance (syncing time) is in general improved.
+ -- clean up scheme in choosing a monitor machine.
+ -- misc code cleaning up and minor adjustments.
+
+version 4.0.1
+ -- Dave Close at ThalesGroup in CA points out
+ the type of c in "c = getopt()" should have been "int c"
+ [ It had been "char c". ]
+
+The mrsync package includes the following files:
+--------------------
+README -- this file (usage example in the end.)
+README.more -- another readme.
+GPL.License -- legal stuff
+mrsync.py -- the driver script that glues everything together
+mrsync_config.py -- system dependent configuration files
+cmdToTarget.py -- module to send command to remote machines
+Makefile.Sun
+Makefile.linux
+backup.c
+complaint_sender.c
+complaints.c
+file_operations.c
+global.c
+id_map.c
+multicaster.c
+multicatcher.c
+page_reader.c
+parse_synclist.c
+rtt.c
+rttcatcher.c
+rttcomplaint_sender.c
+rttcomplaints.c
+rttmissings.c
+rttpage_reader.c
+rttsends.c
+sends.c
+set_catcher_mcast.c
+set_mcast.c
+setup_socket.c
+signal.c
+timing.c
+trFilelist.c
+main.h
+rttproto.h
+proto.h
+signal.h
+rttmain.h
+
+------------
+To install,
+------------
+(1) copy these files to a directory on the target and
+ master machines.
+ (All target machines need at least a copy of the
+ multicatcher.)
+ Or the best (efficient) solution is to install the package
+ on a common file server to which all machines can access.
+(2) Depending on which platform you are in,
+ mv Makefile.Sun (or MakeFile.linux) into Makefile.
+(3) edit the system dependent section in Makefile.
+ Especially, the bindir variable.
+ It is used in the 'install' to copy the executable
+ binaries into the destination directory.
+(4) make
+ to compile and generate the executables. (the warnings for format are harmless).
+(5) make install
+ to copy the executables into $bindir.
+(6) edit mrsync_config.py
+ change the paths for your system.
+ mrsync_config.py is read by mrsync.py
+(7) check if your system has rsync installed
+ in the directory specified in rsyncPath and remote_rsyncPath
+ in the file mrsync_config.py
+(8) check if your system has python installed.
+ Python, like perl, is included in most linux distribution.
+ As distributed, mrsync.py invokes python through the 1st line
+ !/usr/bin/env python
+ If this does not work for you system, you replace
+ the 1st line of mrsync.py with correct one.
+
+------------------------------
+Usage of mrsync.py
+------------------------------
+
+(1) the function of ping --
+ This is removed from multicaster.
+ Instead, please use rtt which gives more useful info.
+ See 'Usage of rtt' below.
+
+(2) mrsync.py -m /dirX/target.list
+ -s /dirY/data
+ -t /dirZ/data
+
+ This is a routine job carried out at Renaissance Technologies Corp.
+ The file target.list lists the names of all target machines (one name per line).
+
+ Before carrying out multicasting, mrsync.py invokes rsync,
+ to find those files in the source directory (-s),
+ compared with the target directory (-t) in the first target machine,
+ that need to be transfered to all target machines.
+ If -t is not specified, then mrysnc assumes that the dest_path
+ on target machines is same as that in -s.
+
+ mrsync.py uses rsync with the following minimum options:
+ --rsh=rsh -avW --dry-run --delete
+ This should work in most of routine cases.
+ This minumum option is specified in mrsync_config.py.
+ I did not test if other combination of options will work
+ so I would advise that you do not change the minimum option.
+
+ However, you can add additional options on top of that minimum
+ options. Use '-o' in mrsync.py
+ e.g.
+ mrsync2.py -m list -s /path/sync/test -t /dest/sync/test -v 1 -o '-H'
+ This turns on --hard-link option in rsync. But hardlink is very tricky
+ to deal with. The minumum options supplied in mrsync_config.py is the one
+ that we find works pretty good in terms of dealing with hardlinks.
+ I do find that some rare sequence of syncing sessions requires that I repeat
+ another round of mrsync.py with additional
+ -o '-H' to set the hardlinks on the targets.
+
+ The output of rsync is stored in a tmp file, which in turn gets
+ parsed by trFilelist. trFilelist translates the list of files into
+ a specific format for multicaster.
+ The reason for this two step process is to remove direct dependence of
+ multicaster on rsync.
+ The gain is that we can now do the case as shown in (3).
+
+(3) mrsync.py -m machine1,machine2
+ -s /dirB/data/
+ -l *.c,subs/foo*,sub2/path/[0-9][0-9].ext
+
+ Here have two new things as compared with the case in (2).
+ First, the two target machines are listed directly
+ in -m option in the form of csv list
+ (instead of listing them in a machine_list file).
+ Second, the -l option is specified.
+ -l option lists the files that we want to be synced.
+ i.e. with this option, we bypass the rsync.
+ In this example, we want to sync
+ all /dirB/data/*.c files,
+ all /dirB/data/subs/foo* files,
+ and all /dirB/data/sub2/path/[0-9][0-9].ext
+ The pattern specified should be as those recognizable by 'ls'.
+
+ Obviously, this usage is convenient for syncing small number of
+ specific files to a couple of machines.
+
+(4) mrsync.py -m /dirA/target_list
+ -s /dirB/data
+ -A 239.255.70.88
+ -P 8010
+
+ Same functionality as in (2).
+ But here we specify the multicast IP address and port number
+ for use by both multicatcher and multicaster.
+
+ This allows multiple mrsync sessions to run simultaneously
+ as long as each session uses its own mcast_address and port_number.
+ [ No overlap between sessions. ]
+
+(5) mrsync.py -m /dirA/target_list
+ -s /path
+ -X
+ Same functionality as in (2).
+ But (2) is a normal sync. This one invokes the -X.
+ In a normal sync, a given file (let's say /path/foo) is synced
+ with two steps,
+ (a) sync the file to /path/foo.tmp
+ (b) mv /path/foo.tmp /path/foo
+ With -X option, the same file is synced with three steps,
+ (a) rm /path/foo
+ (b) sync the file to /path/foo.tmp
+ (c) mv /path/foo.tmp /path/foo
+ This is useful when the target disk is almost full, and
+ could not afford to have foo.tmp and foo co-existing.
+
+(6) By default, mrsync.py prints out only very essential messages.
+ If you want to see more messages (related to some detailed status including
+ a rtt_histogram at the end of the session),
+ you can spcify '-v 1' in the mrsync.py command line argument list.
+ '-v 2' will print out even more detailed messages --- for debugging purpose!
+
+(7) other options of mrsync.py
+ type 'mrsync.py' and see them for yourself.
+
+--------------------------------------------------------------------------------
+Usage of rtt
+----------------
+rtt is a utility for testing between master and one machine.
+Its main function is to report the (histogram) distribution of the
+round-trip-time interval for pages sent.
+I use it to ping a machine
+and to measure the typical rtt time and to see if the network is
+connected properly. System administrators can use the rtt time info
+to turn up the network.
+
+Example usage:
+
+(1) on master, I invoke rtt and it gives the report as follows.
+
+master:~ 1>rtt -r ssh -m target -n 1000 -s 64000
+
+using ssh to invoke rttcatcher on target
+Sun Apr 16 17:36:58 EDT 2006
+ssh target '/path/rttcatcher -a 239.255.67.200 -p 7900 1>/dev/null 2>/dev/null & echo $!'
+remote pid = 27283
+Sending data...
+Missing pages = 0 ( 0.00%) out of total pages = 1000
+rtt histogram
+msec counts
+---- --------
+ 1 989
+ 2 3
+ 3 7
+ 4 1
+
+
+The option -m specifies the name of the target machine.
+-n specifies the number of pages that we want to send.
+-s specifies the size of each page. (maximum = 64512)
+
+In this example, the total number of pages sent is 1000
+which is the sum of the counts in the rtt historgram table.
+The histogram shows that the majority of rtt is around 1-2 msec.
+If the network is really busy, the distribution of the histogram
+will be broadened (instead of concentrated around one particular value).
+
+(2) more options for rtt.
+ type 'rtt' and see them on the screen.
+
+--------------------------------------------------------------------------------
+
+HP Wei
+hp@rentec.com
+Renaissance Technologies Corp.
+
+
diff --git a/README.more b/README.more
new file mode 100644
index 0000000..3a8f7e1
--- /dev/null
+++ b/README.more
@@ -0,0 +1,353 @@
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
++------------+
+| CHANGE LOG |
++------------+
+Jul - Oct 2008 -- version 4.0
+ -- previously, missing page report is sent back one page
+ at a time. I change it to ~60K pages at a time. This
+ drastically reduces the network traffic.
+ -- clean up scheme in choosing a monitor machine.
+Feb - May 2006 -- version 3.0
+ verision 3.0.0 major update
+ -- large file support
+ -- platform independence (between linux, unix)
+ -- backup feature (as in rsync)
+ -- removing meta-file-info
+ We transmit the file_stat info right before
+ the file is being synced. And the file-stat info
+ is transfered by ascii format to avoid byte-ordering
+ translation.
+ The orignal meta_data is not necessay under this scheme.
+ -- catching slow machine as the feedback monitor
+ [ the signal_handler scheme in version 2.0 is
+ removed. ]
+ -- mcast options
+ version 3.0.[1-9] bug fixes
+ -- logic flaw which under certain condition
+ caused premature dropout due to
+ unsuccessful EOF, CLOSE_FILE
+ and caused unwanranted SIT-OUT cases.
+ -- tested on Debian 64-bit arch by Nicolas Marot in France
+ Nicolas also translated mrsync.py to mrsync.sh
+ (nicolas.marot@gmail.com)
+ version 3.1.0
+ -- codes for IPv6 are ready (but not tested)
+ IPv4 is tested ok.
+
+June 9, 2005 -- version 2.1
+ Clint Byrum at clint@careercast.com asks about the exit status
+ of multicaster, which is changed so that it exits whenever
+ there are bad_machines or files_sat_out.
+May 2005 -- Second version
+ The major changes are:
+ (1) implementing congestion control so that mrsync is more
+ net-traffic-friendly.
+ (2) using python script as the glue to put everything together.
+ i.e. the previous mrsync.c is replaced by mrsync.py
+ Other changes collected over the years through interactions with
+ users include:
+ (1) adding options to change the default IP address for multicast,
+ and the default port number for flow control.
+ This was suggested by Robert Dack <robd@kelman.com>
+ (2) replacing memory mapped file IO with the usual
+ seek() and write() sequence.
+ This change was echoed by Clint Byrum <clint@careercast.com>
+ (3) adding verbose control so that by default mrsync prints
+ only essential info instead of detailed status report.
+ This was suggested by Clint Byrum <clint@careercast.com>
+
+Jan 2002 -- First version uploaded to http://freshmeat.net/projects/mrsync/
+ The tar file is in ftp://felder.org/pub/mrsync.tar
+
++-----------------+
+| MRSYNC vs RSYNC |
++-----------------+
+mrsync is a package that consists of utilities for transfering
+a bunch of files from a master machine to multiple target
+machines simultaneously
+by using the multicasting capability in the UNIX system.
+The name 'mrsync' is inspired by the
+popular utility 'rsync' for synchronizing files between
+two machines. However, beyond this similarity in the
+functionality, mrsync is fundamentally different from rsync
+in three areas.
+(1) rsync uses TCP while mrsync needs UDP in order
+ to use the multicasting part of UNIX's socket communication.
+ The former limits the data commuinication to one-to-one-machine
+ whereas the latter allows one-to-many.
+ UDP has, however, no built in flow control. As a result,
+ the major part of mrsync
+ (more precisely, the multicaster and multicatcher),
+ is devoted to synchronizing the data flow.
+(2) For a given file,
+ rsync transfers optionally only those parts in the file
+ that are different
+ in the two versions on the master and the target machine.
+ This saves time, esp on a slow network.
+ mrsync, in contrast, transfers the whole content of a file
+ to all targets at one time. The time gain of mrsync comes from
+ its concurrency as described in (3).
+(3) Replicated data servers have become poplular to serve
+ thousands of clients. Using rsync to replicate the data
+ to hundreds of disks is very time consuming. The total
+ time it takes is proportional to the number of target
+ disks. In contrast, mrsync scales much better because
+ it puts the contents of the file on the network only one
+ time. e.g. we have used mrsync to transfer 140GB of data
+ to 100 machines in a 1Gbit LAN in about 4-6 hours.
+
+This project has started in 2001. We have been using the tool
+every day.
+Recently, I have upgraded this tool to handle large-files,
+to make it platform independent, and to make it 64bit ready.
+The current version is 4.0.0 which is stable.
+It has been used by many people in the UNIX community.
+Lately, people in France has helped me test the code on 64bit arch.
+They are proposing to make it a standard package in the Debian distribution.
+
+mrsync was originally posted on Freshmeat.net. But the link there
+points to one of our company's disks. We would like to find a place
+on SourceForge.net for this package.
+
+
+
++-------------------+
+| HISTORY OF MRSYNC |
++ ------------------+
+The project of mrsync stemmed from the necessity to transfer
+many files to hundreds of machines running Linux at Renaissance
+Technologies Corp. Looking into the Open Source Community, we found
+a preliminary utility codes of multicasting written by Aaron Hillegass.
+Many unsuccessful test-runs on a huge amount of data files, however,
+led us to embark on an overhaul of the code.
+Most of the following items were inherited and bug-fixed from
+the original codes.
+* The low level functions that
+ interact with UNIX's multicasting sockets.
+* Meta_data -- the essential info about a file which the master
+ machine will first transmit to the target machines.
+ [ removed in 2006 (see above change log)].
+* Division of a file into many 'pages'.
+* The idea of maintaining a missing page flag.
+* The idea of a multicaster and multicatcher loop.
+
+In this mrsync, we develop two new critical elements:
+flow-control message communication conducted by the multicaster,
+and a four-state page reader (processor) in the multicatcher.
+The former is to synchronize the task each machine is performing.
+For example, the master will not start sending
+the pages of a file unless all machines have acknowledged
+the completion of openning the disk i/o for the file.
+In order to accomodate these elements, the codes have been
+changed significantly from the original version.
+For example, the multicatcher now never asks for slowing down.
+And multicaster sends data on a file-by-file basis.
+The file integrity is achieved by orchestrating the
+data flow which is closely monitored and conducted
+by the master machine.
+
+[200505] we add congestion control into the code.
+After the master sends one page for a file, it will not send
+the next page until it receives the acknowledgement (ack) message
+sent by a monitor target (a feedbacker if you will).
+This simply-minded congestion control prevents sending pages
+at a pace with which a busy network cannot digest.
+The feedbacker is chosen at the initialization stage of multicaster.
+It may be changed by multicaster whenever it fails to send back
+ack message within a certain duration.
+
+[2006] The monitor machine will send back ack message
+only after it has written the page to disk. This way the 'rtt'
+calculated by the master machine includes the disk IO time,
+instead of just the network-traffic time. The master also
+selects the slowest target machine as the monitor machine.
+These two ingredients provide more precise picture of
+how busy the whole system is, and thus tend to leave no target
+machine behind.
+
+As of today, mrsync has been in full use at Renaissance
+on a regular basis.
+
++-------------------+
+| TEST RUNNING TIME |
++-------------------+
+[200810] using version 4.0.0. The performance improves (if the target machines
+ are not busy).
+ Number of targets = 32. All linux machines on 1Gbit network.
+
+Total number of files = 283 Pages w/o ack = 0 ( 0.00%)
+Total number of pages = 100129 Pages re-sent = 6852 ( 6.84%)
+Total number of bytes = 6449407171 Bytes re-sent = 442030792 ( 6.85%)
+Total time spent = 2.55 (min) ~ 0.37 (min/GB)
+
+Send pages time = 1.78 (min) ~ 0.26 (min/GB)
+
+rtt histogram
+msec counts
+---- --------
+ 0 103415
+ 1 1985
+ 2 162
+ 3 73
+ 4 145
+ 5 68
+ 6 253
+ 7 203
+ 8 305
+ 9 184
+ 10 156
+
+[200604] using version 3.x.x, here is an example of the final statistics
+ in one of our routine syncing jobs.
+ Number of targets = 32. All linux machines on 1Gbit network.
+
+total number of files = 4076
+Total number of pages = 480719 Pages re-sent = 39994 ( 8.32%)
+Total number of bytes = 30875638130 Bytes re-sent = 2573924662 ( 8.34%)
+Total time spent = 39.50 (min) ~ 1.18 (min/GB)
+
+rtt histogram
+msec counts
+---- --------
+ 0 371023
+ 1 118874
+ 2 21977
+ 3 3779
+ 4 2226
+ 5 760
+ 6 584
+ 7 580
+ 8 288
+ 9 162
+ 10 92
+ ...
+
+[200201]
+25 minutes for a group of files whose total size amounts to 4.6GB.
+(This data is obtained from running on 5 SUN machines
+ with Solaris 8 on an Ethernet LAN whose bandwidth is 100Mbits/sec.)
+
++-------------------------------------+
+| MAIN ALGORITHM FOR SENDING ONE FILE |
++-------------------------------------+
+In the multicaster code running on the master machine:
+In the initialization stage, the code selects one target machine
+as a monitor machine (feedbacker), which sends back acknowledgement
+message when one page is received.
+The main loop for multicaster is as follows.
+(1) Send OPEN_FILE command (message).
+(2) Wait for all machine to send back acknowledgement (ack).
+(3) If any machine does not ack back within a certain time period,
+ that machine is considered bad and is dropped out
+ of the monitoring list.
+ During that waiting time period, the master sends the
+ OPEN_FILE message to those potentially bad machines regularly.
+(4) Start sending pages,
+ (all pages, in the first time round;
+ those missing pages reported back by target machines, in other rounds).
+ During this period, all target machines except the feedbacker, if activated,
+ just receive and process whatever pages that arrive at the receive buffer.
+
+ There are two modes of operation for the master to send these pages.
+ (a) When the congestion control is turned off (-x option in mrsync.py)
+ the master sends out one page and wait for a duration (interpage interval),
+ which is specified as DT_PERPAGE in main.h, before it sends the next one.
+ (b) If the congestion control is turned on (the default behavior of mrsync.py),
+ the master will not send the next page until it receives the ack.
+ On the feedbacker, once a SIGIO is triggered indicating the arrival
+ of a page, it sends back ack message to the master.
+(5) Send EOF message to signal the end of transmission
+ for this file.
+(6) Wait for all machines to send back ack.
+ They either report ok
+ or report a list of page numbers that are missing.
+(7) If any machine does not ack back within a certain time period,
+ that machine is considered bad and is dropped out
+ of the monitoring list.
+ During that waiting time period, the master sends the
+ EOF message to those potentially bad machines regularly.
+(8) If there are missing pages, go back to (4).
+(9) Send CLOSE_FILE message.
+(10) Wait for all machine to send back ack.
+(11) If any machine does not ack back within a certain time period,
+ that machine is considered bad and is dropped out
+ of the monitoring list.
+ During that waiting time period, the master sends the
+ CLOSE_FILE message to those potentially bad machines regularly.
+(12) If there are more files to transfer, go back to (1).
+
+
+In multicatcher running on the target machine:
+the main loop performs the following steps,
+(1) Start with IDLE_STATE.
+(2) Upon receiving OPEN_FILE and being in IDLE_STATE,
+ set up a momory mapped temporary file whose size
+ equals that specified in the meta_data,
+ which has been received in prior time.
+ if things are not successful,
+ don't send back ack.
+ else
+ send back ack and enter GET_DATA_STATE.
+(3) Upon receiving one UDP and being in GET_DATA_STATE,
+ store it into the right place in the temporary file.
+ Expect to receive more UDP data and go back to (3).
+(4) Upon receiving EOF and being in GET_DATA_STATE,
+ if there are some missing pages,
+ report them and expect to go back to (3).
+ else there are no missing page,
+ send back ack and enter DATA_READY_STATE.
+ if there is sick conditions,
+ send back ack and enter SICK_STATE.
+(5) Upon receiving CLOSE_FILE and being in DATA_READY_STATE,
+ if in DATA_READY_STATE,
+ rename(temporary_file, the_real_filename),
+ clean up the memory mapped area.
+ if things go well,
+ send back the ack, enter IDLE_STATE
+ and expect to go back to (1),
+ else don't send back ack.
+ else if in SICK_STATE,
+ send back sit_out ack, enter IDLE_STATE
+ and expect to go back to (1).
+
+In addition to the above main loop that runs on all target machines,
+the selected monitor machine (feedbacker) needs to send back
+an received_ack message for every page received and processed.
+
+--------------------------------------------------------------
+Note: The original version dealt with three types of 'files':
+ directory, softlink and regular file.
+ mrsync includes one more: hardlink file.
+
+HP Wei
+hp@rentec.com
+Renaissance Technologies Corp.
+Nov 15, 2001 (first version)
+May 20, 2005 (second version)
+Apr 16, 2006 (third version)
+Oct 28, 2008 (fourth version)
+
+
+
diff --git a/backup.c b/backup.c
new file mode 100644
index 0000000..3c59a9d
--- /dev/null
+++ b/backup.c
@@ -0,0 +1,108 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <regex.h> /* use POSIX in order to be portable to linux */
+
+extern int verbose;
+int backup = FALSE;
+char *pattern_baseDir = NULL;
+
+/* --------- for syncing all files with/without backup. */
+int nPattern=0; /* number of regular expression for backup files */
+regex_t ** fPatterns;/* array of pointers to regular expression */
+
+int set_nPattern(char * fpat_file)
+{
+ FILE *fd;
+ char pat[PATH_MAX];
+ int count = 0;
+ if ((fd = fopen(fpat_file, "r"))==NULL) {
+ fprintf(stderr, "Cannot open file -- %s\n", fpat_file);
+ return FAIL;
+ }
+ while (!feof(fd)) {
+ if (fscanf(fd, "%s", pat) == 1) {
+ ++count;
+ }
+ }
+ nPattern = count;
+ fclose(fd);
+ return SUCCESS;
+}
+
+int read_backup_pattern(char * fpat_file)
+{
+ FILE *fd;
+ char pat[PATH_MAX];
+ int i = 0;
+
+ if (!set_nPattern(fpat_file)) return FAIL;
+
+ fPatterns = malloc(sizeof(void *)*nPattern);
+ for(i = 0; i< nPattern; ++i) {
+ fPatterns[i] = (regex_t *) malloc(sizeof(regex_t));
+ }
+
+ fd = fopen(fpat_file, "r");
+
+ /*
+ if we don't prepend ^,
+ then pattern 'file' intended for files under srcBase
+ will select files with pattern = subdir/file.*
+ which is not our intention.
+ */
+ i = 0;
+ while (!feof(fd)) {
+ if (fscanf(fd, "%s", pat) == 1) {
+ char fullpat[PATH_MAX];
+ sprintf(fullpat, "^%s", pat);
+
+ regcomp(fPatterns[i], fullpat, REG_EXTENDED|REG_NOSUB);
+ ++i;
+ }
+ };
+ fclose(fd);
+ return SUCCESS;
+}
+
+int needBackup(char * filename) /* fullpath */
+{
+ /* we reach this point when the backup flag is true
+ if no_pattern -> backup all of them
+ if pattern
+ if match -> backup
+ nomatch -> no-backup
+ */
+ int i;
+ char *p;
+ if (!backup) return FALSE;
+ if (nPattern==0) return TRUE;
+
+ p = filename + strlen(pattern_baseDir) + 1; /* +1 to get pass the / after basedir */
+ /* fprintf(stderr, "nPattern = %d file= %s\n", nPattern, p); ********/
+ for(i=0; i<nPattern; ++i) {
+ if (regexec(fPatterns[i], p, (size_t)0, NULL, 0)!=0)
+ continue; /* no match */
+ return TRUE;
+ }
+ return FALSE;
+}
+
diff --git a/cmdToTarget.py b/cmdToTarget.py
new file mode 100755
index 0000000..ccf042a
--- /dev/null
+++ b/cmdToTarget.py
@@ -0,0 +1,43 @@
+
+# send cmds to a target machine:
+# docmd(machine, cmd)
+# return (1, output) when succeeds, (0, []) when fails
+
+import os, time, popen2, tempfile;
+
+secsForWait = 30;
+def waitForJob(p, machine):
+ count = 0;
+ while (p.poll()):
+ time.sleep(1);
+ count += 1;
+ if count == secsForWait:
+ killJobs = "kill -9 `ps -ef | grep %s | egrep -v 'grep|python' "\
+ "| awk '{ print $2}'` 2>/dev/null" % machine;
+ os.system(killJobs);
+ return 0;
+ return 1;
+
+def docmd(rsh, machine, cmd):
+ # if scheme tmpfile is not used, p will fail to return if the
+ # output of cmd is large
+ tmpFile = tempfile.mktemp();
+ p = popen2.Popen3("%s %s '%s' > %s 2>/dev/null" % (rsh, machine, cmd, tmpFile));
+
+ if not waitForJob(p, machine): return (0, []);
+
+ lines = open(tmpFile).readlines();
+ os.remove(tmpFile);
+ return (1, lines);
+
+# check if the machine can be talked to by rsh
+def isMachineOK(rsh, machine):
+ p = popen2.Popen3("%s %s 'date 2>/dev/null'" % (rsh, machine));
+
+ if not waitForJob(p, machine): return False;
+
+ lists = p.fromchild.readlines();
+
+ if len(lists)==0: return False;
+ return True;
+
diff --git a/complaint_sender.c b/complaint_sender.c
new file mode 100644
index 0000000..53cfdb4
--- /dev/null
+++ b/complaint_sender.c
@@ -0,0 +1,158 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+
+/* complaint_send_socket */
+int complaint_fd;
+#ifndef IPV6
+struct sockaddr_in complaint_addr;
+#else
+struct sockaddr_in6 complaint_addr;
+#endif
+
+extern int my_FLOW_PORT;
+extern int verbose;
+
+int seq = 0;
+
+/* send buffer */
+char complaint_buffer[FLOW_BUFFSIZE];
+int *ccode_ptr; /* complain code ---- see main.h */
+int *cmid_ptr; /* which machine*/
+int *cfile_ptr; /* which file -- for missing page */
+int *npage_ptr; /* # of pages -- for missing page */
+int *pArray_ptr; /* missing page arrary */
+int *fill_ptr; /* point to next array element */
+
+/* ----------------------------------------------------------------
+ routines to fill in pArray with the missing page indexes
+ --------------------------------------------------------------- */
+void fill_in_int(int i)
+{
+ *fill_ptr++ = htonl(i);
+}
+
+void init_fill_ptr()
+{
+ fill_ptr = pArray_ptr;
+}
+
+/*----------------------------------------------------------
+ init_complaint_sender initializes the buffer to allow the
+ catcher to send complaints back to the sender.
+
+ ret_address of sender to whom we will complain
+ is determined when we receive the first UDP data
+ in read_handle_page() in page_reader.c
+ ----------------------------------------------------------*/
+void init_complaint_sender() /* (struct sockaddr_in *ret_addr) */
+{
+ /* ret_addr is sent by master, in network-byte-order */
+ if (verbose>=2)
+ fprintf(stderr, "in init_complaint_sender\n");
+ /* init the send_socket */
+ complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT);
+
+ /* set up the pointers so we know where to put complaint_data */
+ ccode_ptr = (int *) complaint_buffer;
+ cmid_ptr = (int *)(ccode_ptr + 1);
+ cfile_ptr = (int *)(cmid_ptr + 1);
+ npage_ptr = (int *)(cfile_ptr + 1);
+ pArray_ptr= (int *)(npage_ptr + 1);
+}
+
+#ifndef IPV6
+void update_complaint_address(struct sockaddr_in *sa)
+{
+ sock_set_addr((struct sockaddr *) &complaint_addr,
+ sizeof(complaint_addr), (void*)&sa->sin_addr);
+}
+#else
+void update_complaint_address(struct sockaddr_in6 *sa)
+{
+ sock_set_addr((struct sockaddr *) &complaint_addr,
+ sizeof(complaint_addr), (void*)&sa->sin6_addr);
+}
+#endif
+
+/*------------------------------------------------------------------------
+ send_complaint fills the complaint buffer and send it through our socket
+ back to the sender
+
+ The major use is to tell master machine which pages of which file
+ needs to be re-transmitted.
+ complaint -- the complain code defined in main.h
+ mid -- machine id
+ file -- the file index
+ npage -- # of missing pages
+ followed by an array of missing page index [ page_1, page_2, ... ]
+
+ It is also used for sending back acknoledgement.
+ complaint -- the ack code defined in main.h in the same complaint section.
+ mid -- machine id
+ file -- which file
+ page -- seq number (out of seq complaints will be ignored by the catcher)
+ ------------------------------------------------------------------------*/
+void send_complaint(int complaint, int mid, int page, int file)
+{
+ /* fill in the complaint data */
+ /* 20060323 add converting to network byte-order before sending out */
+ int bytes;
+ *ccode_ptr = htonl(complaint);
+ *cmid_ptr = htonl(mid);
+ *cfile_ptr = htonl(file);
+ if (complaint==MISSING_PAGE || complaint==MISSING_TOTAL) {
+ *npage_ptr = htonl(page);
+ } else {
+ *npage_ptr = htonl(seq++);
+ }
+
+ bytes = (complaint==MISSING_PAGE) ? ((char*)fill_ptr - (char*)ccode_ptr)
+ : (char*)pArray_ptr - (char*)ccode_ptr;
+
+ /* send it */
+ if(sendto(complaint_fd, complaint_buffer, bytes, 0,
+ (const struct sockaddr *)&complaint_addr,
+ sizeof(complaint_addr)) < 0) {
+ perror("Sending complaint\n");
+ }
+ if (verbose>=2)
+ printf("Sent complaint:code=%d mid=%d page=%d file=%d bytes=%d\n",
+ complaint, mid, page, file, bytes);
+}
+
+
+
+
+
+
+
+
diff --git a/complaints.c b/complaints.c
new file mode 100644
index 0000000..fb48da2
--- /dev/null
+++ b/complaints.c
@@ -0,0 +1,579 @@
+/*
+ Copyright (C) 2005-2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+/*#include "paths.h"*/
+#include <sys/types.h>
+#include <time.h>
+#include <setjmp.h>
+#include <signal.h>
+
+extern int monitorID; /* defined in multicaster.c */
+extern int my_FLOW_PORT;
+extern int verbose;
+extern int nPages;
+extern char * cmd_name[];
+extern unsigned int total_pages, real_total_pages;
+extern off_t total_bytes, real_total_bytes;
+
+/* buffer for receiving complaints */
+char flow_buff[FLOW_BUFFSIZE];
+int *code_ptr; /* What's wrong? */
+int *mid_ptr; /* machine id */
+int *file_ptr; /* which file */
+int *npage_ptr; /* # of pages */
+int *pArray_ptr;/* missing page arrary */
+
+/* receive socket */
+int complaint_fd;
+#ifndef IPV6
+struct sockaddr_in complaint_addr;
+#else
+struct sockaddr_in6 complaint_addr;
+#endif
+
+/* status */
+char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */
+int *last_seq; /* array of size nMachines -- seq number of complaints */
+ /* *** watch out the max seq = 2e9 */
+int *total_missing_page; /* array of size nMachines -- persistent thru life of program*/
+char *file_received; /* array of size nMachines */
+char *bad_machines; /* array of size nMachines -- persistent thru life of program*/
+char *machine_status; /* array of size nMachines for ack */
+int *missing_pages; /* array of size nMachines */
+
+int nMachines;
+int has_missing; /* some machines have missing pages for this file (a flag)*/
+int has_sick; /* some machines are sick for this file (a flag)*/
+int skip_count=0; /* number of files that are not delivered */
+int quitWithOneBad=FALSE; /* default: continue with one or more (<all) bad machine */
+
+/* flow control */
+int my_ACK_WAIT_TIMES = ACK_WAIT_TIMES;
+
+/*
+ init_complaints initializes our buffers to receive complaint information
+ from the catchers
+*/
+void init_complaints()
+{
+ int rcv_size;
+
+ if (verbose>=2)
+ fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE);
+
+ /* get pointers set to the right place in buffer */
+ code_ptr = (int *) flow_buff;
+ mid_ptr = (int *) (code_ptr + 1);
+ file_ptr = (int *) (mid_ptr + 1);
+ npage_ptr = (int *) (file_ptr + 1);
+ pArray_ptr= (int *) (npage_ptr+1);
+
+ /* Receive socket (the default buffer size is 65535 bytes */
+ if (verbose>=2) printf("set up receive socket for complaints\n");
+ complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT);
+
+ /*
+ getsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &i, &il);
+ printf(" rcvbuf = %d type = %d\n", i, il);
+ exit(0);
+ the default in our machines -> size = 65535 and type = 4
+ */
+
+ rcv_size = TOTAL_REC_PAGE * FLOW_BUFFSIZE;
+ if (setsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){
+ perror("Expanding receive buffer for init_complaints");
+ }
+}
+
+void init_missing_page_flag(int n)
+{
+ int i;
+ nPages = n;
+ if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) {
+ fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n);
+ perror("error = ");
+ exit(0);
+ }
+ for(i=0; i<nPages; ++i) {
+ missing_page_flag[i] = RECEIVED;
+ }
+}
+
+void page_sent(int page)
+{
+ missing_page_flag[page] = RECEIVED;
+}
+
+void free_missing_page_flag()
+{
+ free(missing_page_flag);
+ missing_page_flag = NULL;
+}
+
+void set_has_missing()
+{
+ has_missing = TRUE;
+}
+
+void reset_has_missing()
+{
+ has_missing = FALSE;
+}
+
+void set_has_sick()
+{
+ has_sick = TRUE;
+}
+
+void reset_has_sick()
+{
+ has_sick = FALSE;
+}
+
+int has_sick_machines()
+{
+ return has_sick;
+}
+
+int has_missing_pages()
+{ /* originally, this fx use missing_page_flag[]
+ That will not work if the master did not receive missing page info */
+ return has_missing;
+}
+
+void refresh_missing_pages()
+{
+ int i;
+ for(i=0; i<nMachines; ++i) missing_pages[i] = 0;
+}
+
+void refresh_machine_status()
+{
+ int i;
+ for(i=0; i<nMachines; ++i) machine_status[i] = NOT_READY;
+}
+
+void mod_machine_status()
+{
+ /* take into account of machines which have received this file */
+ int i;
+ for(i=0; i<nMachines; ++i) {
+ if (file_received[i]==FILE_RECV) machine_status[i]=MACHINE_OK;
+ }
+}
+
+void refresh_file_received()
+{
+ int i;
+ for(i=0; i<nMachines; ++i) file_received[i] = NOT_RECV;
+}
+
+int nNotRecv()
+{
+ int i, c;
+ c = 0;
+ for(i=0; i<nMachines; ++i) {
+ if (file_received[i] == NOT_RECV) ++c;
+ }
+ return c;
+}
+
+int iNotRecv()
+{
+ /* find the first NotRecv machine */
+ int i;
+ for(i=0; i<nMachines; ++i) {
+ if (file_received[i] == NOT_RECV) return i;
+ }
+ return -1;
+}
+
+void init_machine_status(int n)
+{
+ int i;
+ nMachines = n;
+ machine_status = malloc(n * sizeof(char));
+ refresh_machine_status();
+
+ bad_machines = malloc(n * sizeof(char));
+ for(i=0; i<nMachines; ++i) {
+ bad_machines[i] = GOOD_MACHINE;
+ }
+
+ total_missing_page = malloc(n * sizeof(int));
+ for(i=0; i<nMachines; ++i) {
+ total_missing_page[i] = 0;
+ }
+
+ missing_pages = malloc(n * sizeof(int));
+ for(i=0; i<nMachines; ++i) {
+ missing_pages[i] = 0;
+ }
+
+ file_received = malloc(n * sizeof(char));
+ refresh_file_received();
+
+ skip_count = 0;
+
+ last_seq = malloc(n * sizeof(int));
+ for(i=0; i<nMachines; ++i) {
+ last_seq[i] = -1;
+ }
+}
+
+int get_total_missing_pages(int n)
+{
+ return total_missing_page[n];
+}
+
+int read_handle_complaint(int cmd)
+{
+ /*
+ cmd = cmd_code -- each cmd expects different 'response' (complaints)
+ return 1 for complaint handled
+ return 0 for irrelevant complaint
+ return -1 for time-out
+ */
+ int mid_v, code_v, file_v, npage_v, bytes_read;
+
+ if (readable(complaint_fd)) {
+ /* There is a complaint */
+ bytes_read = recvfrom(complaint_fd, flow_buff, FLOW_BUFFSIZE, 0, NULL, NULL);
+
+ /* 20060323 deal with big- vs little-endian issue
+ convert incoming integers into host representation */
+
+ mid_v = ntohl(*mid_ptr);
+
+ if ((bytes_read < FLOW_HEAD_SIZE) || (mid_v < 0 ) ||
+ (mid_v >= (unsigned int) nMachines) || /* boundary check for mid_v for safety */
+ (bad_machines[mid_v] == BAD_MACHINE)) { /* ignore complaint from a bad machine*/
+ return 0;
+ }
+
+ code_v = ntohl(*code_ptr);
+ file_v = ntohl(*file_ptr);
+ npage_v = ntohl(*npage_ptr);
+
+ /* check if the complaint is for the current file */
+ if (code_v != MONITOR_OK && file_v != current_entry()) return 0;
+ /* out of seq will be ignored */
+ if (code_v != MISSING_PAGE && code_v != MISSING_TOTAL) { /********* MISSING_TOTAL ? *************/
+ if (npage_v <= last_seq[mid_v]) return 0;
+ else last_seq[mid_v] = npage_v;
+ }
+
+ switch (code_v) {
+ case PAGE_RECV:
+ /******** check if machineID is the one we have set. */
+ /*if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);*/
+ if (cmd == SENDING_DATA && mid_v == monitorID)
+ return 1;
+ else
+ return 0;
+
+ case MONITOR_OK:
+ /********* check if machineID is the one we have set. */
+ if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);
+ if (cmd == SELECT_MONITOR_CMD && mid_v == monitorID)
+ return 1;
+ else
+ return 0;
+
+ case OPEN_OK :
+ if (cmd == OPEN_FILE_CMD) {
+ machine_status[mid_v] = MACHINE_OK;
+ return 1;
+ } else {
+ return 0;
+ }
+
+ case CLOSE_OK :
+ if (cmd == CLOSE_FILE_CMD || cmd == CLOSE_ABORT_CMD) {
+ machine_status[mid_v] = MACHINE_OK;
+ return 1;
+ } else {
+ return 0;
+ }
+
+ case EOF_OK :
+ if (cmd == EOF_CMD && file_received[mid_v]==NOT_RECV) {
+ machine_status[mid_v] = MACHINE_OK;
+ file_received[mid_v] = FILE_RECV;
+ return 1;
+ } else {
+ return 0;
+ }
+
+ case MISSING_PAGE :
+ if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0;
+ if (npage_v > nPages) return 0;
+ {
+ int i, *pi, page_v;
+ pi = pArray_ptr;
+ for (i = 0; i<npage_v; ++i) {
+ page_v = ntohl(pi[i]);
+ if (page_v<1 || page_v > nPages) continue; /*** make sure page_v starts with 1*/
+ missing_page_flag[page_v-1] = MISSING;
+ }
+ }
+ missing_pages[mid_v] += npage_v;
+ set_has_missing();
+ return 1;
+
+ case MISSING_TOTAL:
+ if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV || machine_status[mid_v] == MACHINE_OK)
+ return 0;
+ /* Consider to add: if npage_v >missing_pages[mid_v], ask to resend
+ [ likely no big gain ] */
+ total_missing_page[mid_v] += npage_v;
+ set_has_missing(); /* store the info about missing info */
+ machine_status[mid_v] = MACHINE_OK; /* machine_status serves as ack only */
+ return 1;
+
+ case SIT_OUT :
+ if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0;
+ fprintf(stderr, "*** %s sits-out-receiving %s\n",
+ id2name(mid_v), getFilename());
+ machine_status[mid_v] = MACHINE_OK;
+
+ if (!has_sick) ++skip_count;
+ set_has_sick();
+ return 1;
+
+ default :
+ if (verbose>=2) fprintf(stderr, "Unknown complaint: code = %d\n", code_v);
+ return 0;
+ } /* end of switch */
+ } /* end of if(readable) */
+
+ /* time out of readable() */
+ return -1;
+}
+
+int all_machine_ok()
+{
+ int i;
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE)
+ return FALSE; /* there is at least one machine that has not sent ack */
+ }
+ return TRUE; /* all machines are ready */
+}
+
+/* this is for the master to receive the acknowledgement. */
+void wait_for_ok(int code)
+{
+ int i, count, resp;
+ time_t tloc;
+ time_t rtime0, rtime1;
+
+ rtime0 = time(&tloc); /* reference time */
+
+ count = 0;
+ while (!all_machine_ok()) {
+ resp = read_handle_complaint(code);
+ if (resp==1) { /* if there is a complaint handled */
+ rtime0 = time(&tloc); /* reset the reference time */
+ continue;
+ }
+
+ if (resp==0) { /* irrelevant complaint received */
+ continue;
+ }
+
+ /* no complaints handled within the time period set by set_delay() */
+ rtime1 = time(&tloc); /* time since last complaints */
+ if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) {
+ ++count;
+ if (count < my_ACK_WAIT_TIMES) {
+ if (verbose>=1 && (count % 10 == 0))
+ fprintf(stderr, " %d: resend cmd(%s) to machines:[ ", count, cmd_name[code]);
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) {
+ if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "%d ", i);
+ send_cmd(code, (int) i);
+ usleep(FAST);
+ }
+ }
+ if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "]\n");
+ rtime0 = rtime1;
+ } else { /* allowable period of time has passed */
+ fprintf(stderr, " Drop these bad machines:[ ");
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) {
+ /* fprintf(stderr, "%d ", i); */
+ fprintf(stderr, "%s ", id2name(i));
+ bad_machines[i]= BAD_MACHINE;
+ /*
+ The pages sent by the master will be ignored by
+ the bad machine, because the current_file nubmer
+ does not match.
+ */
+ }
+ }
+ fprintf(stderr, "]\n");
+ break;
+ }
+ }
+ }
+}
+
+int is_it_missing(int page)
+{
+ return (missing_page_flag[page]==MISSING) ? TRUE : FALSE;
+}
+
+int pr_missing_pages()
+{
+ int i, N, exit_code=0;
+ off_t delta;
+ unsigned int dp;
+
+ for(i=0; i<nMachines; ++i) {
+ char name[PATH_MAX];
+ N = get_total_missing_pages(i);
+ strcpy(name, id2name(i));
+ if (strlen(name)==0) sprintf(name, "machine(%3d)", i);
+ fprintf(stderr, "%s: #_missing_page_request = %6.2f%% = %d\n",
+ name, (double)N/((double)total_pages)*100.0, N);
+ }
+
+ if (skip_count>0) {
+ fprintf(stderr, "\nWarning: There are %d files which are not delivered.\n", skip_count);
+ exit_code = -1;
+ }
+
+ fprintf(stderr, "\nTotal number of files = %12d Pages w/o ack = %12u (%6.2f%%)\n",
+ total_entries(), pages_wo_ack(), (double)pages_wo_ack()/(double)real_total_pages*100.0);
+
+ dp = real_total_pages - total_pages;
+ fprintf(stderr, "Total number of pages = %12d Pages re-sent = %12u (%6.2f%%)\n",
+ total_pages, dp, (double)dp/(double)total_pages*100.0);
+
+ delta = (off_t)(real_total_bytes - total_bytes);
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "Total number of bytes = %12llu Bytes re-sent = %12llu (%6.2f%%)\n",
+ total_bytes, delta, (double)delta/(double)total_bytes*100.0);
+ #else
+ fprintf(stderr, "Total number of bytes = %12d Bytes re-sent = %12u (%6.2f%%)\n",
+ total_bytes, delta, (double)delta/(double)total_bytes*100.0);
+ #endif
+
+ return (exit_code);
+}
+
+/* count the number of bad machines */
+int nBadMachines()
+{
+ int i, count = 0;
+ for(i=0; i<nMachines; ++i) {
+ if (bad_machines[i] == BAD_MACHINE) ++count;
+ }
+
+ return count;
+}
+
+int choose_print_machines(char *stArray, char selection, char * msg_prefix)
+{
+ int i, count = 0;
+
+ for(i=0; i<nMachines; ++i) {
+ char line[PATH_MAX];
+ if (stArray[i] == selection) {
+ ++count;
+ strcpy(line, id2name(i));
+ if (count == 1) { fprintf(stderr, msg_prefix); }
+ if (strlen(line)==0)
+ fprintf(stderr, "%d ", i);
+ else
+ fprintf(stderr, "%s ", line);
+ }
+ }
+ if (count > 0) fprintf(stderr, "]\n");
+ return count;
+}
+
+int send_done_and_pr_msgs(double total_time, double t_page)
+{
+ int exit_code1 =0;
+ int exit_code2 =0;
+ int exit_code3 =0;
+
+ send_all_done_cmd();
+
+ /* exit_code1 !=0 if there are files that were not delivered due to change or skipped */
+ exit_code1 = pr_missing_pages();
+
+ fprintf(stderr, "Total time spent = %6.2f (min) ~ %6.2f (min/GB)\n\n",
+ total_time, total_time / ((double)real_total_bytes/1.0e9));
+ fprintf(stderr, "Send pages time = %6.2f (min) ~ %6.2f (min/GB)\n\n",
+ t_page, t_page / ((double)real_total_bytes/1.0e9));
+
+ exit_code2 = choose_print_machines(bad_machines,
+ BAD_MACHINE,
+ "Not synced for bad machines:[ ");
+
+ if (quitWithOneBad && nBadMachines() >=1) {
+ fprintf(stderr, "We choose to exit when at least one target is bad\n");
+ fprintf(stderr, "All files following the current one did not get delivered\n");
+ fprintf(stderr, "If resend cmd(CLOSE_FILE), then the current file may have been delivered to non-bad targets\n\n");
+ }
+
+ if (current_entry() < total_entries()) { /* if we exit prematurely */
+ exit_code3 = choose_print_machines(machine_status,
+ NOT_READY, "\nNot-ready machines:[ ");
+ }
+
+ if (verbose>=1) pr_rtt_hist();
+ return (exit_code1+exit_code3); /* 200807 removed exit_code2 because bad machines case has been dealt with
+ by -q. If no -q, then the bad machines are considered 'harmless' */
+}
+
+/* to do some cleanup before exit IF all machines are bad */
+void do_badMachines_exit()
+{
+ if ((quitWithOneBad && nBadMachines() < 1) ||
+ (!quitWithOneBad && (nBadMachines() < nMachines))) return;
+
+ if (quitWithOneBad)
+ fprintf(stderr, "One (or more) machine is bad. Exit!\n");
+ else
+ fprintf(stderr, "All machines are bad. Exit!\n");
+
+ send_done_and_pr_msgs(-1.0, -1.0);
+ exit(-1);
+}
+
+void do_cntl_c(int signo)
+{
+ fprintf(stderr, "Control_C interrupt detected!\n");
+
+ send_done_and_pr_msgs(-1.0, -1.0);
+ exit(-1);
+}
diff --git a/file_operations.c b/file_operations.c
new file mode 100644
index 0000000..d9a8733
--- /dev/null
+++ b/file_operations.c
@@ -0,0 +1,800 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file contains all the file-operations for multicatcher.
+ 20060404:
+ found a undetected omission in open_file().
+ AFter lseek(), we should write a dummy byte so that
+ multicatcher.zzz has the right file size to start with.
+ Otherwise, it will grow as syncing progresses.
+
+ Port the code to deal with Large_files.
+ esp in write_page(),
+ lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET)
+ 200603:
+ Remove the meta-data operation.
+ Each file's info (stat) is transfered to targets during
+ the OPEN_FILE_CMD. extract_file_info() in this file
+ is to get that stat info for the current entry(file).
+
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Previously, memory mapped file was used for file IO
+ but later was changed to simple open() and lseek(), write().
+ This was also echoed in a patch by Clint Byrum <clint@careercast.com>.
+
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was originally called wish_list.c
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include <libgen.h>
+#include "main.h"
+
+extern int verbose;
+extern int machineID;
+
+char * baseDir = NULL;
+char * missingPages=NULL; /* starting address of the array of flags */
+int fout; /* file discriptor for output file */
+
+int toRmFirst = FALSE; /* remove existing file first and then sync */
+/* off_t total_bytes_written;*/
+unsigned int nPages;
+int had_done_zero_page;
+int current_file_id = 0; /* 1, 2, 3 ... or -1, -2 ... for backup */
+int backup; /* flag for a file that needs backup when deletion */
+char *backup_suffix = NULL;
+char my_backup_suffix[] = "_mcast_bakmmddhhmm";
+
+/* file stat_info which is transmitted from master */
+mode_t stat_mode;
+nlink_t stat_nlink;
+uid_t stat_uid;
+gid_t stat_gid;
+off_t stat_size;
+time_t stat_atime;
+time_t stat_mtime;
+
+char filename[PATH_MAX];
+char linktar[PATH_MAX];
+char fullpath[PATH_MAX];
+char tmp_suffix[L_tmpnam];
+
+void default_suffix()
+{
+ time_t t;
+ struct tm tm;
+ time(&t);
+ localtime_r(&t, &tm);
+ sprintf(my_backup_suffix, "_mcast_bak%02d%02d%02d%02d",
+ tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min);
+ backup_suffix = my_backup_suffix;
+ return;
+}
+
+void get_tmp_suffix()
+{
+ /* this is called once in each multicatcher */
+ char tmp[L_tmpnam];
+ tmpnam_r(&tmp[0]);
+ strcpy(tmp_suffix, basename(tmp));
+}
+
+int make_backup()
+{
+ char fnamebak[PATH_MAX];
+ if (strlen(fullpath) + strlen(backup_suffix) > (PATH_MAX-1)) {
+ fprintf(stderr, "backup filename too long\n");
+ return FAIL;
+ };
+
+ if (!backup) return SUCCESS; /* if not match with pattern, skip the backup */
+
+ sprintf(fnamebak, "%s%s", fullpath, backup_suffix);
+ /*
+ The backup scheme is as follows.
+ ln file file.bak (mv file file.bak would cause file to non-exist for a short while)
+ mv file.new file
+ */
+ if (link(fullpath, fnamebak) != 0) {
+ if (errno != ENOENT && errno != EINVAL) {
+ fprintf(stderr,"hardlink %s => %s : %s\n",fullpath, fnamebak, strerror(errno));
+ return FAIL;
+ }
+ }
+ if (verbose >= 2) {
+ fprintf(stderr, "backed up %s to %s\n",fullpath, fnamebak);
+ }
+ return SUCCESS;
+}
+
+void get_full_path(char * dest, char * sub_path)
+{
+ /* prepend the sub_path with baseDir --> dest */
+ strcpy(dest, baseDir);
+ strcat(dest, "/");
+ strcat(dest, sub_path);
+}
+
+void get_tmp_file(char * tmp)
+{
+ /*char *fncopy;*/
+ /* ******* change to filename_mcast.fileabc%$&? */
+ /* fncopy = strdup(filename); dirname change the string content */
+ strcpy(tmp, baseDir);
+ strcat(tmp, "/");
+ strcat(tmp, filename);
+ strcat(tmp, "_");
+ strcat(tmp, TMP_FILE);
+ strcat(tmp, tmp_suffix);
+ /* free(fncopy); */
+}
+
+int my_unlink(const char *fn)
+{
+ if (verbose>=2)
+ fprintf(stderr, "deleting file: %s\n", fn);
+ if (unlink(fn) != 0) {
+ if (errno==ENOENT) {
+ return SUCCESS;
+ } else {
+ /* NOTE: unlink() could not remove files which do not have w permission!*/
+ /* resort to shell command */
+ char cmd[PATH_MAX];
+ sprintf(cmd, "rm -f %s", fn);
+ if (system(cmd)!=0) {
+ fprintf(stderr, "'rm -f' fails for %s\n", fn);
+ return FAIL;
+ }
+ }
+ }
+ return SUCCESS;
+}
+
+int my_touch(const char*fn)
+{
+ int fo;
+ if( (fo = open(fn, O_RDWR | O_CREAT | O_TRUNC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) {
+ perror(fn);
+ return FAIL;
+ }
+
+ /* set the size of the output file */
+ if(lseek(fo, 0, SEEK_SET) == -1) {
+ fprintf(stderr, "cannot seek for %s\n", fn);
+ perror(fn);
+ return FAIL;
+ }
+ close(fo);
+ return SUCCESS;
+}
+
+int extract_file_info(char * buf, int n_file, unsigned int n_pages)
+{
+ /*
+ Along with OPEN_FILE_CMD, the data area in rec_buf contains
+ (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0
+ where the stat_string contains the buf in
+ sprintf(buf, "%lu %lu %lu %lu %lu %lu %lu", st.st_mode, st.st_nlink,
+ st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime);
+ */
+ char * pc = &buf[0];
+
+ #ifdef _LARGEFILE_SOURCE
+ if (sscanf(pc, "%u %u %u %u %llu %lu %lu %d", &stat_mode, &stat_nlink,
+ &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime,
+ &toRmFirst) != 8)
+ return FAIL;
+ #else
+ if (sscanf(pc, "%u %u %u %u %lu %lu %lu %d", &stat_mode, &stat_nlink,
+ &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime,
+ &toRmFirst) != 8)
+ return FAIL;
+ #endif
+
+ /* fprintf(stderr, "size= %llu\n", stat_size); *********/
+
+ pc += (strlen(pc) + 1);
+ strcpy(filename, pc);
+ get_full_path(fullpath, filename);
+
+ linktar[0] = '\0';
+ if (S_ISLNK(stat_mode) || stat_nlink > 1) { /* if it is a softlink or hardlink */
+ pc += (strlen(pc) +1);
+ strcpy(linktar, pc);
+ }
+
+ nPages = n_pages;
+ current_file_id = n_file;
+ backup = (current_file_id < 0);
+ had_done_zero_page = FAIL;
+ /*total_bytes_written = 0;*/
+ return SUCCESS;
+}
+
+int open_file()
+{
+ int i;
+
+ /*
+ sometimes for disk space reason, it is necessary
+ to first remove the file and sync.
+ If toReplace is true, the backup option should be off.
+ */
+ if (toRmFirst) {
+ if (!my_unlink(fullpath)) {
+ fprintf(stderr, "Replacing ");
+ perror(filename);
+ return FAIL;
+ }
+ }
+
+ /* fprintf(stderr, "%d %d %d\n", stat_mode, stat_nlink, stat_size); *********/
+ if (S_ISREG(stat_mode) && stat_nlink == 1) {
+ /* if it is a regular file and not a hardlink */
+ char tmpFile[PATH_MAX];
+ get_tmp_file(tmpFile);
+
+ my_unlink(tmpFile); /* make sure it's not there from previous runs */
+
+ if( (fout = open(tmpFile, O_RDWR | O_CREAT | O_TRUNC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) {
+ fprintf(stderr, "cannot open() %s for writing. \n", tmpFile);
+ perror(tmpFile);
+ return FAIL;
+ }
+
+ /* set the size of the output file (see Steve's book on page 411) */
+ if(lseek(fout, stat_size - 1 , SEEK_SET) == -1) {
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "lseek() error for %s with size = %llu\n", tmpFile, stat_size);
+ #else
+ fprintf(stderr, "lseek() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size);
+ #endif
+ perror(tmpFile);
+ close(fout);
+ return FAIL;
+ }
+ if (write(fout, "", 1) != 1) {
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "write() error for %s with size = %llu\n", tmpFile, stat_size);
+ #else
+ fprintf(stderr, "write() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size);
+ #endif
+ perror(tmpFile);
+ close(fout);
+ return FAIL;
+ }
+ }
+
+ /* init missingPages flags */
+ if (!missingPages) free(missingPages);
+ missingPages = malloc(sizeof(char) * nPages);
+ for(i=0; i < nPages; ++i) missingPages[i] = MISSING;
+
+ if (verbose>=2) fprintf(stderr, "Ready to receive file = %s\n", filename);
+ return SUCCESS;
+}
+
+int close_file()
+{
+ /* delete missingPages flags which was malloc-ed in open_file() */
+ free(missingPages);
+ missingPages = NULL;
+
+ if (S_ISREG(stat_mode) && stat_nlink == 1) {
+ /* if it is a regular file and not a hardlink */
+ char tmpFile[PATH_MAX];
+ struct stat stat;
+
+ if ((fout != -1) && (close(fout) != 0)) {
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n",
+ filename, stat_size);
+ #else
+ fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n",
+ filename, (unsigned int)stat_size);
+ #endif
+ perror("close");
+ return FAIL;
+ }
+ fout = -1; /* if the following fails, the reentry wont do munmap() */
+
+ /* the real work */
+ /* get_full_path(oldFile, filename); **** unnecessary fullpath is the oldFile */
+
+ get_tmp_file(tmpFile);
+
+ /* 8/14/2002
+ If there was a hardware (disk IO) problem
+ the sync should not proceed.
+ ** Add the following checking.
+ */
+ if (lstat(tmpFile, &stat)<0) {
+ perror("ERROR: close_file() cannot lstat the tmp file\n");
+ return FAIL;
+ }
+
+ if (backup && !make_backup(fullpath)) { /* make a hardlink oldFile => backup_file */
+ fprintf(stderr, "fail to make backup for %s\n", fullpath);
+ return FAIL;
+ }
+ if (rename(tmpFile, fullpath)<0) {
+ perror("ERROR: close_file():rename() \n");
+ return FAIL;
+ }
+
+ /* 20071016 in rare occasion, the written file has not the right size */
+ if (lstat(fullpath, &stat)<0) {
+ fprintf(stderr, "ERROR: close_file() cannot lstat %s\n", fullpath);
+ return FAIL;
+ }
+ if (stat_size != stat.st_size) {
+ fprintf(stderr, "ERROR: close_file() filesize != incoming-size\n");
+ return FAIL;
+ }
+ }
+
+ /* for debug
+ if (verbose>=2) fprintf(stderr, "total bytes written %llu for file %s\n",
+ total_bytes_written, filename);
+ */
+
+ return SUCCESS;
+}
+
+int rm_tmp_file()
+{
+ char tmpFile[PATH_MAX];
+
+ if ((fout != -1) && (close(fout) != 0)) {
+ #ifdef _LARGEFILE_SOURCE
+ fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n",
+ filename, stat_size);
+ #else
+ fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n",
+ filename, (unsigned int)stat_size);
+ #endif
+ perror("rm_tmp");
+ return FAIL;
+ }
+ fout = -1;
+
+ get_tmp_file(tmpFile);
+
+ return (my_unlink(tmpFile));
+}
+
+int nPages_for_file()
+{
+ return nPages;
+};
+
+/* return total number of missing pages */
+int get_missing_pages()
+{
+ int i, result=0;
+
+ for(i=0; i < nPages; ++i)
+ if ((missingPages[i]) == MISSING) ++result;
+ return result;
+}
+
+int is_missing(int index)
+{
+ return (missingPages[index] == MISSING) ? TRUE : FALSE;
+}
+
+void page_received(int index)
+{
+ missingPages[index] = RECEIVED;
+}
+
+/*
+ write() in write_page() may block forever.
+ This function is to check if write() is ready.
+*/
+int writable(int fd)
+{
+ struct timeval write_tv;
+ fd_set wset;
+ FD_ZERO(&wset);
+ FD_SET(fd, &wset);
+
+ write_tv.tv_sec = WRITE_WAIT_SEC;
+ write_tv.tv_usec = WRITE_WAIT_USEC;
+ return (select(fd + 1, NULL, &wset, NULL, &write_tv)==1);
+}
+
+void write_page(int page, char *data_ptr, int bytes)
+{
+ /* page = page number starting with 1 */
+ if (page < 1 || page > nPages) return;
+
+ /* Do we need to write this page? */
+ if (is_missing(page-1)){
+ if (!writable(fout)) return;
+
+ /* Write the data */
+ if (lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET)<0) {
+ if (verbose>=1) {
+ fprintf(stderr, "ERROR: write_page():lseek() at page %d for %s\n",
+ page, filename);
+ perror("ERROR");
+ }
+ return;
+ }
+ if (write(fout, data_ptr, bytes)<0) {
+ /* write IO error !!! */
+ perror("ERROR");
+ fprintf(stderr, "write_page():write() error: at page %d for %s\n", page, filename);
+ return;
+ }
+
+ /* Mark the page as received in our wish list */
+ page_received(page-1);
+ /*total_bytes_written += bytes;*/
+ } else {
+ /* If we don't need to write it, just return */
+ if (verbose >=2) {
+ fprintf(stderr, "Already have page %d for %d:Ignoring\n", page, current_file_id);
+ }
+ }
+ return;
+}
+
+/* For files whose size is 0 */
+int touch_file()
+{
+ if (verbose >=2)
+ fprintf(stderr, "touching file: %s\n", fullpath);
+
+ my_unlink(fullpath);
+ return my_touch(fullpath);
+ /* system()
+ VERY time in-efficient
+ char cmd[PATH_MAX];
+ sprintf(cmd, "touch %s", fullpath);
+ return (system(cmd)==0);
+ */
+}
+
+int delete_file(int to_check_dir_type)
+{
+ struct stat st;
+ char fp[PATH_MAX];
+ int trailing_slash;
+ int type_checking;
+
+ strcpy(fp, fullpath);
+
+ /* remove trailing slash if any -- for deletion-'type' checking */
+ if (to_check_dir_type) {
+ char *pc;
+ type_checking = TRUE;
+ pc = &fp[0] + strlen(fp) - 1;
+ if (*pc=='/') {
+ *pc = '\0';
+ trailing_slash = TRUE;
+ } else {
+ trailing_slash = FALSE;
+ };
+ } else {
+ type_checking = FALSE;
+ }
+
+ if(lstat(fp, &st) < 0) {
+ /* already gone ? */
+ return SUCCESS;
+ }
+ if (S_ISREG(st.st_mode) || S_ISLNK(st.st_mode)) { /* delete a file or link */
+ if (verbose>=2)
+ fprintf(stderr, "deleting file: %s\n", fp);
+
+ if (type_checking && trailing_slash) { /* intended to remove a directory when it is not */
+ return FAIL;
+ }
+
+ if (backup && S_ISREG(st.st_mode) && !make_backup(fp)) {/* backup regular file */
+ return FAIL; /* failed to make_backup */
+ }
+ return (my_unlink(fp));
+ } else if (S_ISDIR(st.st_mode)) { /* remove a directory */
+ char cmd[PATH_MAX];
+ if (verbose>=2)
+ fprintf(stderr, "deleting directory: %s\n", fp);
+
+ if (type_checking && (!trailing_slash)) { /* intended to remove a non-dir when it is directory */
+ return FAIL;
+ }
+
+ sprintf(cmd, "rm -rf %s", fp); /* remove everything in dir, watch out for this */
+ return (system(cmd)==0);
+ }
+ /* not file, link, directory */
+ fprintf(stderr, "unrecognized file_mode for %s\n", fp);
+ return FAIL;
+}
+
+/* send complaints to the master for missing data */
+int ask_for_missing_page()
+{
+ int i, n=0, total=0;
+
+ /*
+ Send missing page indexes if any
+ */
+ init_fill_ptr();
+ for(i=0; i < nPages; ++i) {
+ if (missingPages[i] == MISSING ) {
+ ++n;
+ ++total;
+ if (n > MAX_NPAGE) {
+ /* send previous missing page-indexes */
+ send_complaint(MISSING_PAGE, machineID, MAX_NPAGE, current_file_id);
+ init_fill_ptr();
+ n = 1;
+ }
+ /* fill in one page index */
+ fill_in_int(i+1); /* origin = 1 */
+ }
+ }
+ /* send the rest of missing pages complaint */
+ if (n>0) send_complaint(MISSING_PAGE, machineID, n, current_file_id);
+
+ return total; /* there are missing pages */
+}
+
+void missing_page_stat()
+{
+ int i, n=0, sum=0, last=-1;
+
+ for(i=0; i < nPages; ++i) {
+ if (missingPages[i] == MISSING ) {
+ ++n;
+ if (last<0) {
+ sum += i;
+ } else {
+ sum += (i - last);
+ }
+ last = i;
+ }
+ }
+ if (n>0) {
+ double a = sum;
+ double b = n;
+ fprintf(stderr, "file= %d miss= %d out-of %d avg(delta_index) = %f\n",
+ current_file_id, n, nPages, a/b);
+ }
+}
+
+void my_perror(char * msg)
+{
+ char fn[PATH_MAX];
+ sprintf(fn, "%s - %s", fullpath, msg);
+ perror(fn);
+}
+
+int set_owner_perm_times()
+{
+ int state = SUCCESS;
+
+ /* set owner */
+ if (lchown(fullpath, stat_uid, stat_gid)!=0) {
+ my_perror("chown");
+ state = FAIL;
+ }
+
+ /*
+ set time and permission.
+ Don't try to set the time and permission on a link
+ */
+ if (!S_ISLNK(stat_mode)) {
+ struct utimbuf times;
+ if (chmod(fullpath, stat_mode)!=0) {
+ my_perror("chmod");
+ state = FAIL;
+ }
+
+ times.actime = stat_atime;
+ times.modtime = stat_mtime;
+ if (utime(fullpath, &times)!=0) {
+ my_perror("utime");
+ state = FAIL;
+ }
+ }
+ return state;
+}
+
+int update_directory()
+{
+ struct stat st;
+ int exists;
+ char fp[PATH_MAX], *pc;
+
+ if (verbose>=2)
+ fprintf(stderr, "Updating dir: %s\n", fullpath);
+
+ /* if fullpath is a softlink that points to a dir,
+ and it has a trailing '/',
+ lstat() will view it as a directory !
+ So, we remove the trailing '/' before lstat() */
+ strcpy(fp, fullpath);
+ pc = &fp[0] + strlen(fp) - 1;
+ if (*pc=='/') *pc = '\0';
+
+ if(lstat(fp, &st) < 0) {
+ switch(errno) {
+ case ENOENT:
+ exists = FALSE;
+ break;
+ default:
+ my_perror("lstat");
+ return FAIL;
+ }
+ } else {
+ exists = TRUE;
+ }
+
+ if (!exists) {
+ /* There's nothing there, so create dir */
+ if (mkdir(fp, stat_mode) < 0){
+ my_perror("mkdir");
+ return FAIL;
+ }
+ return SUCCESS;
+ } else if (!S_ISDIR(st.st_mode)) {
+ /* If not a directory delete what is there */
+ if (unlink(fp)!=0){
+ my_perror("unlink");
+ return FAIL;
+ }
+ if (verbose>=2)
+ fprintf(stderr, "Deleted file %s to replace with directory\n", fp);
+ if (mkdir(fp, stat_mode) < 0){
+ my_perror("mkdir");
+ return FAIL;
+ }
+ return SUCCESS;
+ } else {
+ /* If dir exists, just chmod */
+ chmod(fp, stat_mode);
+ /*** 20070410: changing mtime of a dir can cause NFS to confuse
+ See http://lists.samba.org/archive/rsync/2004-May/009439.html
+ So, I comment it out in the following
+
+ struct utimbuf times;
+ times.actime = stat_atime;
+ times.modtime = stat_mtime;
+ ***/
+ /*
+ if (utime(fullpath, &times) < 0) {
+ my_perror("utime");
+ }
+ */
+ return SUCCESS;
+ }
+}
+
+int update_directory0()
+{
+ DIR *d;
+
+ struct utimbuf times;
+ times.actime = stat_atime;
+ times.modtime = stat_mtime;
+
+ if (verbose>=2)
+ fprintf(stderr, "Creating dir: %s\n", fullpath);
+
+ d = opendir(fullpath);
+ if (d == NULL) {
+ switch(errno) {
+ case ENOTDIR:
+ /* If not a directory delete what is there */
+ if (unlink(fullpath)!=0){
+ my_perror("unlink");
+ return FAIL;
+ }
+ if (verbose>=2)
+ fprintf(stderr, "Deleted file %s to replace with directory\n", fullpath);
+ /* Fall through to ENOENT */
+ case ENOENT:
+ /* There's nothing there, so create dir */
+ if (mkdir(fullpath, stat_mode) < 0){
+ my_perror("mkdir");
+ return FAIL;
+ }
+ return SUCCESS;
+ default:
+ my_perror("opendir");
+ return FAIL;
+ }
+ } else {
+ /* If dir exists, just chmod */
+ closedir(d);
+ chmod(fullpath, stat_mode);
+ /*** 20070410: changing mtime of a dir can cause NFS to confuse
+ See http://lists.samba.org/archive/rsync/2004-May/009439.html
+ So, I comment it out in the following ***/
+ /*
+ if (utime(fullpath, &times) < 0) {
+ my_perror("utime");
+ }
+ */
+ return SUCCESS;
+ }
+}
+
+int check_zero_page_entry()
+{
+ /* when total_pages = 0, the entry can be
+ an empty file
+ softlink (hardlink),
+ a directory
+ */
+ if (had_done_zero_page) return SUCCESS; /* to avoid doing it again */
+
+ if (S_ISDIR(stat_mode)) { /* a directory */
+ if (!update_directory()) {
+ had_done_zero_page = FAIL;
+ return FAIL;
+ }
+ } else if (S_ISLNK(stat_mode)) { /* Is it a softlink? */
+ if (verbose>=2)
+ fprintf(stderr, "Making softlink: %s -> %s\n", fullpath, linktar);
+ delete_file(FALSE); /* remove the old one at fullpath */
+ if (symlink(linktar, fullpath) < 0) {
+ my_perror("symlink");
+ had_done_zero_page = FAIL;
+ return FAIL;
+ }
+ } else if (stat_nlink > 1) { /* hardlink */
+ char fn[PATH_MAX];
+ get_full_path(fn, linktar); /* linktar is a relative path from synclist */
+ if (verbose>=2)
+ fprintf(stderr, "Making a hardlink: %s => %s\n", fullpath, fn);
+ my_unlink(fullpath); /* remove the old one */
+ if (link(fn, fullpath)!=0) {
+ my_perror("link");
+ had_done_zero_page = FAIL;
+ return FAIL;
+ }
+ } else {
+ /* it must be a regular file */
+
+ if (!touch_file()) {
+ had_done_zero_page = FAIL;
+ return FAIL;
+ } else {
+ set_owner_perm_times();
+ }
+ }
+ had_done_zero_page = SUCCESS;
+ return SUCCESS;
+}
+
diff --git a/global.c b/global.c
new file mode 100644
index 0000000..64104eb
--- /dev/null
+++ b/global.c
@@ -0,0 +1,35 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+char *cmd_name[] = { "TIME-OUT",
+ "TEST",
+ "SEND_DATA",
+ "RESEND_DATA",
+ "OPEN_FILE",
+ "EOF",
+ "CLOSE_FILE",
+ "CLOSE_ABORT",
+ "ALL_DONE",
+ "SEL_MONITOR",
+ "NULL"};
+
+int verbose = 0; /* = 0 little detailed output, = 1,2 ... = n a lot more details */
+int machineID = -1; /* used for multicatcher */
+
diff --git a/id_map.c b/id_map.c
new file mode 100644
index 0000000..8b8dcaa
--- /dev/null
+++ b/id_map.c
@@ -0,0 +1,74 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <string.h>
+
+void strip(char * str);
+
+/* place to hold the array of string */
+char ** machine_names = NULL; /* array of (char*) */
+int nTargets=0;
+
+void get_machine_names(char * filename)
+{
+ FILE *fd;
+ char line[PATH_MAX];
+ int count=0;
+
+ if ((fd = fopen(filename, "r")) == NULL) {
+ fprintf(stderr, "Cannot open file -- %s \n", filename);
+ return;
+ }
+ while (fgets(line, PATH_MAX, fd) != NULL) {
+ strip(line);
+ if (strlen(line) != 0) ++count;
+ }
+ if (count == 0) {
+ fclose(fd);
+ fprintf(stderr, "No machine names in the file = %s\n", filename);
+ return;
+ }
+
+ nTargets = count;
+
+ rewind(fd);
+ machine_names = malloc(nTargets * sizeof(void*));
+
+ line[0] = '\0';
+ count = 0;
+ while(fgets(line, PATH_MAX, fd) != NULL) {
+ strip(line);
+ if (strlen(line)==0) continue;
+ machine_names[count] = (char*)strdup(line);
+ line[0] = '\0';
+ ++count;
+ }
+ fclose(fd);
+}
+
+char * id2name(int id)
+{
+ return (machine_names) ? machine_names[id] : "";
+}
diff --git a/main.h b/main.h
new file mode 100644
index 0000000..48f8c55
--- /dev/null
+++ b/main.h
@@ -0,0 +1,189 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Following the suggestion and the patch by Clint Byrum <clint@careercast.com>,
+ I added more control to selectively print out messages.
+ The control is done by the statement 'if (version >= n)'
+
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __main_h
+#define __main_h
+
+#include <dirent.h>
+#include <time.h>
+#include <utime.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
+#include <arpa/inet.h> /* inet(3) functions */
+#include <errno.h>
+#include <fcntl.h> /* for nonblocking */
+#include <sys/ioctl.h>
+#include <netdb.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/stat.h> /* for S_xxx file mode constants */
+#include <sys/uio.h> /* for iovec{} and readv/writev */
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/time.h> /* timeval{} for select() */
+#include <sys/times.h>
+#include <limits.h>
+
+#define VERSION "4.0.1"
+
+/* logic values */
+#define FALSE 0
+#define TRUE 1
+#define FAIL (FALSE)
+#define SUCCESS (TRUE)
+#define GOOD_EXIT 0
+#define BAD_EXIT -1
+
+/* Ports and addresses */
+#define PORT 8888 /* for multicast */
+#define FLOW_PORT (PORT-1) /* for flow-control */
+#define MCAST_ADDR "239.255.67.92"
+#define MCAST_TTL 1
+#define MCAST_LOOP FALSE
+#define MCAST_IF NULL
+
+/*
+ Handling socket's receive buffer on the target machine:
+ if the available data size in the receiveing buffer is larger
+ than TOO_MUCH then a TOO_FAST complaint is triggered.
+ The master will then sleep for USEC_TO_IDLE
+ Currently, this is not effective.
+*/
+#define TOO_FAST_LIMIT (TOTAL_REC_PAGE / 2) /* if half is full, then too fast */
+#define TOO_MUCH (TOO_FAST_LIMIT * PAGE_BUFFSIZE)
+#define USEC_TO_IDLE 1000000
+
+/* TIMING stuff */
+#define FAST 5000 /* usec */
+#define DT_PERPAGE 6000 /* usec */
+#define FACTOR 90 /* interpage interval = FACTOR * DT_PERPAGE or DT_PERPAGE*/
+#define SECS_FOR_KILL 30 /* time(sec) allowed for 'kill -9 pid' to finish */
+
+/* time for the master to wait for the acknowledgement */
+#define ACK_WAIT_PERIOD 1 /* secs (from time()); */
+#define ACK_WAIT_TIMES 60 /* wait for this many periods */
+
+#define SICK_RATIO (0.9)
+#define SICK_THRESHOLD (50) /* SICK FOR such many TIMES is really sick */
+
+/* max wait time for write() a page of PAGE_SIZE -- 100 msec */
+#define WRITE_WAIT_SEC 0
+#define WRITE_WAIT_USEC 100000
+
+#define SET_MON_WAIT_TIMES 6000 /* time = this number * FAST */
+#define NO_FEEDBACK_COUNT_MAX 10
+#define SWITCH_THRESHOLD 50 /* to avoid switching monitor too frequently
+ because of small diff in missing_pages */
+
+/* complaints */
+#define TOO_FAST 100
+#define OPEN_OK 200
+#define CLOSE_OK 300
+#define MISSING_PAGE 400
+#define MISSING_TOTAL 500
+#define EOF_OK 600
+#define SIT_OUT 700
+#define PAGE_RECV 800
+#define MONITOR_OK 900
+
+/* Sizes */
+/* 20060427: removed size_t which is arch-dependent */
+#define PAGE_SIZE 64512
+#define HEAD_SIZE (sizeof(int) + 2 * sizeof(int) + 2 * sizeof(int))
+#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE)
+#define TOTAL_REC_PAGE 20 /* change to 4 in case hit the OS limit in buf size */
+
+#define FLOW_HEAD_SIZE (sizeof(int)*4)
+#define FLOW_BUFFSIZE (PAGE_SIZE+FLOW_HEAD_SIZE)
+#define MAX_NPAGE (PAGE_SIZE / sizeof(int))
+/*
+ Modes and command codes:
+ The numerical codes are also the index to retrieve the command names
+ for printing in complaints.c
+*/
+#define TIMED_OUT 0
+#define TEST 1
+#define SENDING_DATA 2
+#define RESENDING_DATA 3
+#define OPEN_FILE_CMD 4
+#define EOF_CMD 5
+#define CLOSE_FILE_CMD 6
+#define CLOSE_ABORT_CMD 7
+#define ALL_DONE_CMD 8
+#define SELECT_MONITOR_CMD 9
+#define NULL_CMD 10
+
+/* machine status ----- for caster */
+#define MACHINE_OK_MISSING_PAGES '\2'
+#define MACHINE_OK '\1'
+#define NOT_READY '\0'
+
+#define BAD_MACHINE '\1'
+#define GOOD_MACHINE '\0'
+
+#define FILE_RECV '\1'
+#define NOT_RECV '\0'
+
+/* representation of all-targets for sends */
+#define ALL_MACHINES -1
+
+/* PAGE STATUS */
+#define MISSING '\0'
+#define RECEIVED '\1'
+
+/* MACHINE STATE ----- for catcher */
+#define IDLE_STATE 0
+#define GET_DATA_STATE 1
+#define DATA_READY_STATE 2
+#define SICK_STATE 3
+
+/*
+ The following two are info to be packed into
+ meta data to represent either file or directory deletion.
+*/
+/* SPECIAL # of PAGES to signal deleting action */
+#define TO_DELETE (-1)
+
+/* temporary file name prefix for transfering to */
+#define TMP_FILE "mrsync."
+
+#include "proto.h"
+
+#endif
diff --git a/mrsync.py b/mrsync.py
new file mode 100755
index 0000000..bd8c0c7
--- /dev/null
+++ b/mrsync.py
@@ -0,0 +1,284 @@
+#!/usr/bin/env python
+
+import os,sys,string,time,getopt;
+
+my_module_path = os.path.dirname(sys.argv[0]);
+sys.path.append(my_module_path);
+from mrsync_config import *
+
+host = os.uname()[1];
+
+def bin(path, bin_name):
+ "return the full-path of a binary code"
+ return '%s/%s' % (path, bin_name);
+
+def isPathThere(path):
+ "check the existence of path on this master machine"
+ if not os.path.exists(path):
+ print >>sys.stderr, 'Path (%s) could not be found on %s' % (path, host);
+ sys.exit(-1);
+
+map(isPathThere, [bin(binDir, x) for x in codes]);
+
+def printTimeMsg(msg):
+ print >>sys.stderr, 'Time = %s %s' % (time.ctime(time.time()), msg);
+
+def prMulticastLog(msg):
+ "append msg to the log file"
+ open(multicast_log, 'a').write('%s %s\n' % (time.ctime(time.time()), msg));
+
+(catcher_err_log, goodTargetsFile, syncFileList) = \
+ map(lambda x:
+ '%s.%s' % (x, ('%02d'*5) % time.localtime()[1:6]),
+ (catcher_err_log, goodTargetsFile, syncFileList));
+
+def usage():
+ print """mrsync.py (to sync files from one to many machines by multicast)
+ Option list:
+ [ -v <verbose_level 0-2 (2 is for debug)> ]
+ [ -w <ack_wait_times default= 60 (secs) ]
+ [ -r <remote shell to invoke multicatcher, default=rsh> ]
+ [ -b <remote_bin_dir_path for multicatcher, default=%s> ]
+ [ -o <more rsync options such as --include --exclude,
+ to be appended to default min_opts = %s>]
+ [ -x flag to turn off monitor mechanism (not fully tested and not recommended) ]
+ ----- Essential options --------------------------------------------------------
+ -m <machine_list_file_path or csv_list (name1,name2...)>
+ -s <source_data_path>
+ [ -t <target_data_path; default = that in -s option> ]
+ [ -l <list of files(wildcards) to be synced, can be a filepath or csv_list>
+ mrsync by default uses rsync to find the list unless this option is given. ]
+ ----- mcast options ------------------------------------------------------------
+ [ -A <my_MCAST_ADDR such as 239.255.67.92> ]
+ [ -P <my_PORT such as 9000> ]
+ [ -T <my_TTL default=1> ]
+ [ -L flag to turn on mcast_LOOP. default is off ]
+ [ -I <my_MCAST_IF default=NULL> ]
+ """ % (rBinDir, min_rsync_opt%reshell);
+
+# --- handle command line
+opts, args = getopt.getopt(sys.argv[1:], 'hxv:w:r:b:o:m:s:t:l:A:P:T:LI:', []);
+
+if len(opts)==0 or len(args)>0:
+ usage();
+ sys.exit(-1);
+
+verbose = 0;
+ack_wait_times = -1;
+without_monitor = False;
+machineListFile = '';
+sourcePath = '';
+targetPath = '';
+synclist = '';
+rsync_opts = min_rsync_opt % reshell;
+
+mcast_addr = '';
+port = -1;
+ttl = 1;
+loop = False;
+mcast_if = '';
+
+if not len(opts) == 0:
+ for o,v in opts:
+ if o=='-v':
+ verbose = string.atoi(v);
+ elif o=='-w':
+ ack_wait_times = string.atoi(v);
+ elif o=='-h':
+ usage();
+ sys.exit(0);
+ elif o=='-r':
+ reshell = v;
+ elif o=='-b':
+ rBinDir = v;
+ elif o=='-o':
+ rsync_opts += (' %s' % v)
+ elif o=='-m':
+ machineListFile = v;
+ elif o=='-s':
+ sourcePath = v;
+ elif o=='-t':
+ targetPath = v;
+ elif o=='-l':
+ synclist = v;
+ elif o=='-A':
+ mcast_addr = v;
+ elif o=='-P':
+ port = string.atoi(v);
+ elif o=='-T':
+ ttl = string.atoi(v);
+ elif o=='-L':
+ loop = True;
+ elif o=='-I':
+ mcast_if = v;
+ elif o=='-x':
+ without_monitor = True;
+
+if verbose>=1: print 'mrsync version 4.0.0';
+
+if not machineListFile or not sourcePath:
+ print >>sys.stderr, 'Essential options (-m -s) should be specified.';
+ usage();
+ sys.exit(-1);
+
+isPathThere(sourcePath);
+if not targetPath: targetPath = sourcePath;
+
+# ------------ get machine list
+machines = (os.path.exists(machineListFile) and
+ [x[:-1] for x in open(machineListFile).readlines()] or
+ machineListFile.split(','));
+
+# clean up the names
+machines = filter(lambda x: x!='', machines);
+machines = [x.strip() for x in machines];
+
+if host in machines:
+ if verbose>=1: print 'remove myself (%s) from machine list...' % host;
+ machines.remove(host);
+
+# ------------ get the syncList from the first good machine
+# the list is stored in syncFileList for multicaster.
+import cmdToTarget;
+
+def cleanup(file):
+ "remove a file if it exists"
+ if os.path.exists(file): os.unlink(file);
+
+def get_synclist_from_cmdline(tmp):
+ "extracts synclist from cmdline option, outputs results into tmp_file"
+ cleanup(tmp);
+ if verbose>=1: print >>sys.stderr, 'extracting %s...' % synclist;
+
+ list = (os.path.exists(synclist) and
+ [x[:-1] for x in open(synclist).readlines()] or
+ synclist.split(','));
+
+ if len(list)==0:
+ print >>sys.stderr, "Empty sync_list in cmd_line option %s" % synclist;
+ sys.exit(-1);
+
+ def pr_relative_path(fullpath):
+ open(tmp, 'a').write('%s\n' % fullpath.replace(sourcePath+'/', ''));
+
+ for item in list:
+ " each item can be a pattern for files"
+ import glob
+ map(pr_relative_path, glob.glob('%s/%s' % (sourcePath, item)));
+
+
+def get_synclist_from_rsync(tmp):
+ "use rsync to get a list of to-be-synced files. results are put in tmp file"
+ cleanup(tmp);
+ for machine in machines:
+ # check if this machine is ok to rsh (ssh)
+ if (not cmdToTarget.isMachineOK(reshell, machine)): # if not go to next machine
+ continue;
+
+ if verbose>=1:
+ print >>sys.stderr, 'Get to-be-synced files from %s...' % machine;
+
+ cmd = ' '.join(filter(lambda x: x,
+ [rsyncPath, '--rsync-path='+remote_rsyncPath, \
+ (reshell != 'rsh' and \
+ rsync_opts.replace('--rsh=rsh', '--rsh=%s' % reshell) \
+ or rsync_opts),
+ sourcePath+'/',
+ '%s:%s/' % (machine, targetPath),
+ '> %s 2>&1; ' % tmp]));
+
+ if verbose>=1: print >>sys.stderr, cmd;
+ os.system(cmd);
+ break;
+
+def tmp_synclist():
+ "intermediate synclist filename"
+ return '/tmp/%s' % os.path.basename(syncFileList);
+
+(synclist and get_synclist_from_cmdline or get_synclist_from_rsync)(tmp_synclist());
+
+def ckFileSize(file):
+ if os.path.getsize(file)==0:
+ print >>sys.stderr, "Empty file = %s" % file;
+ sys.exit(-1);
+
+ckFileSize(tmp_synclist());
+
+# translate the files generated by rsync or command-line option into
+# a format which can be recognized by multicaster.
+cmd = ' '.join([bin(binDir, translate),
+ tmp_synclist(), sourcePath, '>', syncFileList]);
+if verbose>=1: print >>sys.stderr, cmd;
+os.system(cmd);
+
+ckFileSize(syncFileList);
+
+# ------------ invoke multicatcher on all target machines
+def gen_catcher_cmd(count):
+ "return mulitcatcher_command to be invoked on target machines"
+ return ' '.join(filter(lambda x: x,
+ [bin(rBinDir, catcher),
+ '-t', targetPath,
+ '-i', '%d'%count, # machine id
+ (mcast_addr and '-A '+mcast_addr or ''),
+ (port>0 and '-P %d'%port or ''),
+ (mcast_if and '-I %s'%mcast_if or ''),
+ '</dev/null 1>/dev/null 2>%s &' % catcher_err_log])); ### workaround ssh problem
+
+def invoke_catcher(ms, count, bads):
+ "invoke catcher for each machine in ms, return bad_machines in bads"
+ if not ms: return bads;
+
+ machine = ms.pop(0);
+
+ if (not cmdToTarget.isMachineOK(reshell, machine)):
+ if verbose>=1: print >>sys.stderr, "***%s is down" % machine;
+ bads.append(machine);
+ return invoke_catcher(ms, count, bads);
+
+ cmd = gen_catcher_cmd(count);
+ if count==0 and verbose>=1: print >>sys.stderr,cmd;
+ status, output = cmdToTarget.docmd(reshell, machine, cmd);
+
+ if (not status):
+ if verbose>=1: print >>sys.stderr, "***remote shell exec failed for %s" % machine;
+ bads.append(machine);
+ return invoke_catcher(ms, count, bads);
+
+ if verbose>=1: print >>sys.stderr, 'id:%3d %s' % (count, machine);
+ open(goodTargetsFile, 'a').write('%s\n' % machine)
+ return invoke_catcher(ms, count+1, bads);
+
+printTimeMsg("Invoking multicatcher on all %d machines..." % len(machines));
+cleanup(goodTargetsFile);
+badMachines = invoke_catcher(machines, 0, []);
+
+# -------------- invoke multicast on the master machine
+printTimeMsg('Starting multicasting...');
+prMulticastLog('start multicast on %s' % host);
+
+def gen_caster_cmd():
+ "return mulitcaster_command to be invoked on this host (master machine)"
+ return ' '.join(filter(lambda x: x,
+ [bin(binDir, caster),
+ '-v %d' % verbose,
+ '-m %s' % goodTargetsFile,
+ '-s %s' % sourcePath,
+ '-f %s' % syncFileList,
+ (ack_wait_times>0 and '-w %d'% ack_wait_times or ''),
+ (mcast_addr and '-A '+mcast_addr or ''),
+ (port>0 and '-P %d'%port or ''),
+ (ttl>1 and '-T %d'%ttl or ''),
+ (loop and '-L' or ''),
+ (mcast_if and '-I %s'%mcast_if or ''),
+ (without_monitor and '-x' or '')]));
+
+cmd = gen_caster_cmd();
+if verbose>=1: print cmd;
+ex_code = os.system(cmd);
+print >>sys.stderr, 'ex_code= ', ex_code;
+
+# -------------- to exit
+printTimeMsg('ALL DONE.');
+prMulticastLog('multicast ends on %s' % host);
+sys.exit(ex_code);
diff --git a/mrsync_config.py b/mrsync_config.py
new file mode 100644
index 0000000..5953e24
--- /dev/null
+++ b/mrsync_config.py
@@ -0,0 +1,67 @@
+# configuration file
+
+# used by mrsync.py to find the executables multicaster, multicatcher, trFilelist
+# hopefully, all machines put them in the same place as on the master machine.
+binDir = '/a/path/name'; # bin
+rBinDir = binDir; # for remote target machines.
+
+# This is a general bookkeeping file for mrsync.py.
+# It records start_time and end_time of a mrsync session.
+# Each mrsync.py appends time info into this file.
+# So it should be a path that can be accessed by any machine invoking mrsync.py
+
+multicast_log = '/path/multicast.log';
+
+#---------------------------------------------------------------------------
+# The following three file-names are for tmp storage
+#
+# To allow for multiple multicast sessions at the same time, the actual
+# name of the file will be that specified here plus a suffix = .mmddhhmmss
+# The latter (mmddhhmm) is generated by mrsync.py
+# --> It is necessary to clean up old files :))
+#
+
+# This is a path on all target machines for a running multicatcher
+# to catch its stderr output if any. Usually we don't expect any output in this file.
+# It's there for debugging purpose in case any error event occurs.
+#
+# Set it to /dev/null, if we don't want any output.
+#
+catcher_err_log = '/a_path/catcher.err';
+
+# The goodTargetsFile is for mrsync.py to output names of the machines that are accessible.
+# It is read by multicaster.
+caster_log_dir = '/A_PATH/sync/';
+goodTargetsFile = caster_log_dir + 'syncing';
+
+# The file path for storing list of to-be-transfered files
+# It is used by multicaster.
+syncFileList = caster_log_dir + 'syncfile.lst';
+#
+# ---------------------------------------------------------------------------
+#
+
+# Path to find the rsync binary
+rsyncPath = '/usr/local/bin/rsync';
+remote_rsyncPath = rsyncPath;
+
+#
+reshell = "rsh"; # can be changed thru command line option
+
+#
+# Do NOT change the following unless you know what you are doing.
+#
+# This is the absolute minimum of rsync options for multicaster
+# rsync is used only to find files that need to be synced.
+# -- to add other options such as --exclude --include, use command line option
+# those extra options will be appended to this min_rsync_opt
+min_rsync_opt = '--rsh=%s -avW --dry-run --delete';
+
+#
+# varioius binary codes used
+#
+# the binary to translate file_list generated by rsync or manually for multicaster
+translate = 'trFilelist';
+catcher = 'multicatcher';
+caster = 'multicaster';
+codes = [ translate, catcher, caster];
diff --git a/multicaster.c b/multicaster.c
new file mode 100644
index 0000000..3b26cae
--- /dev/null
+++ b/multicaster.c
@@ -0,0 +1,582 @@
+/*
+ Copyright (C) 2006-2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ version 3.0 major update
+ -- large file support
+ -- platform independence (between linux, unix)
+ -- backup feature (as in rsync)
+ -- removing meta-file-info
+ -- catching slow machine as the feedback monitor
+ -- mcast options
+ version 3.0.[1-9] bug fixes
+ -- logic flaw which under certain condition
+ caused premature dropout due to
+ unsuccessful EOF, CLOSE_FILE
+ and caused unwanranted SIT-OUT cases.
+ -- tested on Debian 64 bit arch by Nicolas Marot in France
+ version 3.1.0
+ -- codes for IPv6 are ready (but not tested)
+ IPv4 is tested ok.
+ version 3.2.0
+ -- monitor change improvement
+ -- handshake improvement (e.g. seq #)
+ -- if one machine skips a file, all will NOT close()
+ version 4.0 major update
+ -- consolidate sending missing pages in complaint flow
+ cutting the messages by one order magnitude
+ -- exit code adjustment
+
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <stdlib.h>
+#include <limits.h> /* to define PATH_MAX */
+#include <sys/times.h>
+#include <setjmp.h>
+#include <signal.h>
+
+extern int verbose;
+extern char * machine_list_file; /* defined in complaints.c */
+extern char * bad_machines; /* array of size nMachines, defined in complaints.c */
+extern char * file_received;
+extern int * missing_pages;
+extern int backup;
+extern int nPattern;
+extern char * pattern_baseDir;
+extern int nTargets;
+extern int my_ACK_WAIT_TIMES;
+extern int toRmFirst;
+extern int quitWithOneBad;
+extern int skip_count;
+extern int file_changed;
+
+char * my_MCAST_ADDR = MCAST_ADDR;
+int my_FLOW_PORT = FLOW_PORT;
+int my_PORT = PORT;
+int my_TTL = MCAST_TTL;
+int my_LOOP = MCAST_LOOP;
+char * my_IFname = MCAST_IF;
+
+int monitorID = -1;
+
+int no_feedback_count;
+
+void usage()
+{
+ fprintf(stderr,
+ "multicaster (to copy files to many multicatchers) version %s)\n"
+ " Option list:\n"
+ " [ -v <verbose_level 0-2> ]\n"
+ " [ -w <ack_wait_times default= %d (secs) ]\n"
+ " [ -X toRmFirst_flag default= cp2tmp_and_mv ]\n"
+ " [ -q quit_with_1_bad defalut= quit_with_all_bad ]\n"
+ " -------- essential options ----------------------------------\n"
+ " -m <machine_list_filePath>\n"
+ " -s <data_source_path>\n"
+ " -f <synclist_filePath>\n"
+ " -------- options for backup ---------------------------------\n"
+ " [ -b flag to turn on backup ]\n"
+ " [ -r <filepath> for regex patterns for files needing backup ]\n"
+ " [ -d <basedir> for regex patterns ]\n"
+ " -------- mcast options --------------------------------------\n"
+ " [ -A <my_mcast_address default=%s> **same as for multicatcher ]\n"
+ " [ -P <my_PORT default=%d> **same as for multicatcher ]\n"
+ " [ -T <my_TTL default=%d> ]\n"
+ " [ -L flag turn on mcast_LOOP. default is off ]\n"
+ " [ -I <my_MCAST_IF default=NULL> ]\n",
+ VERSION, my_ACK_WAIT_TIMES, MCAST_ADDR, PORT, my_TTL);
+}
+
+
+int monitor_cmd(int cmd, int machineID)
+{
+ /*
+ Once this fx is called the old monitor will be
+ turned off. So, we need to make sure this fx
+ returns TRUE for a machine. Or the monitor_set_up
+ should fail.
+ */
+ int count =0, resp;
+ set_delay(0, FAST);
+ send_cmd(cmd, machineID);
+ while (1) { /* wait for ack */
+ resp = read_handle_complaint(cmd);
+ if (resp<0) { /* time-out */
+ ++count;
+ send_cmd(cmd, machineID);
+ if (count > SET_MON_WAIT_TIMES) {
+ return FALSE;
+ }
+ } else if (resp==1) { /* got ack */
+ return TRUE;
+ }
+ /* irrevelant resp -- continue */
+ }
+}
+
+int find_max_missing_machine(char *flags)
+{
+ /* flags[] -> which machines are not considered */
+ int i, index = -1;
+ int max = -1;
+ int threshold;
+
+ for(i=0; i<nTargets; ++i) {
+ int missing;
+ /** missing = total_missing_page[i]; **/
+ missing = missing_pages[i]; /* for last file (OPEN) or this file (other cmds)*/
+ if (missing > max && flags[i] == GOOD_MACHINE) {
+ max = missing;
+ index = i;
+ }
+ }
+
+ threshold = (get_nPages() > SWITCH_THRESHOLD*100) ?
+ get_nPages() / 100 : SWITCH_THRESHOLD;
+ if (index>=0) {
+ if (monitorID>=0) {
+ if (flags[monitorID]==BAD_MACHINE) return index;
+ /** if (max <= (total_missing_page[monitorID] + threshold)) **/
+ if (max <= (missing_pages[monitorID] + threshold))
+ return monitorID;
+ }
+ return index;
+ } else { /* could come here if all are busy*/
+ return -1;
+ }
+
+ /**
+ return ((index >= 0) &&
+ (monitorID >= 0) &&
+ (bad_machines[monitorID] == GOOD_MACHINE) &&
+ (max <= (total_missing_page[monitorID] + threshold))) ?
+ monitorID : index;
+ **/
+}
+
+void set_monitor(int mid)
+{
+ /* one machine is to be set. So need to succeed. */
+ if (monitor_cmd(SELECT_MONITOR_CMD, mid)) {
+ if (verbose >=1) fprintf(stderr, "Monitor - %s\n", id2name(mid));
+ return;
+ } else {
+ fprintf(stderr, "Fatal: monitor %s cannot be set up!\n", id2name(mid));
+ send_done_and_pr_msgs(-1.0, -1.0);
+ exit(BAD_EXIT);
+ }
+}
+
+void check_change_monitor(int undesired_index)
+{
+ /* this function changes the value of the global var: monitorID */
+ int i, count;
+ char * flags;
+
+ /* if all targets received the file, no need to go on */
+ if ((count=nNotRecv())==0) return;
+
+ if (count==1) {
+ i = iNotRecv();
+ if (bad_machines[i] == GOOD_MACHINE) {
+ monitorID = i;
+ /*
+ 'i' could be the current monitor.
+ We'd like to set it because there might
+ be something wrong with it if we come to
+ to this point.
+ */
+ set_monitor(monitorID);
+ }
+ return;
+ }
+
+ /* more than two machines do not receive the file yet */
+ /* flags mark those machines as BAD which we don't want to consider */
+ flags = malloc(nTargets * sizeof(char));
+ for(i=0; i<nTargets; ++i) {
+ flags[i] = (i == undesired_index || file_received[i] == FILE_RECV) ?
+ BAD_MACHINE : bad_machines[i];
+ }
+
+ /* check if all machines are not to be considered */
+ count = 0;
+ for(i=0; i<nTargets; ++i) {
+ if (flags[i]==BAD_MACHINE) ++count;
+ }
+ if (count==nTargets) {
+ /* Two ways to get here are
+ (1) during do_one_page:
+ prev_monitor = (not_recv) and other not_recv's are bad_machines.
+ (2) during after-ack:
+ prev_monitor = (recv) and other not_recv's are bad_machines.
+ */
+ free(flags);
+ set_monitor(monitorID);
+ return;
+ }
+
+ /* at least one not_recv (and good) machine is to be considered */
+ count = 0;
+ while (count < nTargets) {
+ i = find_max_missing_machine(flags);
+ /* if (i==monitorID) break;*/
+ monitorID = i;
+
+ if (monitorID < 0) break;
+
+ if (monitor_cmd(SELECT_MONITOR_CMD, monitorID)) {
+ if (verbose >=1) fprintf(stderr, "Monitor = %s\n", id2name(monitorID));
+ break;
+ } else {
+ flags[monitorID] = BAD_MACHINE;
+ /* Then, we attemp to set up other machine */
+ }
+ ++count;
+ }
+
+ free(flags);
+ if (monitorID < 0) {
+ fprintf(stderr, "Fatal: monitor machine cannot be set up!\n");
+ send_done_and_pr_msgs(-1.0, -1.0);
+ exit(BAD_EXIT);
+ }
+}
+
+void do_one_page(int page)
+{
+ int resp;
+ unsigned long rtt;
+ refresh_timer();
+ start_timer();
+ if (!send_page(page)) return;
+
+ /* first ignore all irrelevant resp */
+ resp=read_handle_complaint(SENDING_DATA);
+ while (resp==0) {
+ resp=read_handle_complaint(SENDING_DATA);
+ }
+
+ /* read_handle_complaint() waits n*interpage_interval at most */
+ if (resp==-1) {
+ /* delay_sec for readable() is set by set_delay() */
+ /******
+ at this point, the readable() returns without getting a reply
+ from monitorID after FACTOR*DT_PERPAGE (or DT_PERPAGE if without_monitor)
+ ****/
+ ++no_feedback_count;
+ if (verbose>=2) printf("no reply, count = %d\n", no_feedback_count);
+ update_rtt_hist(999999);
+ /* register this page as rtt = infinite --- the last element in rtt_hist */
+
+ if (no_feedback_count > NO_FEEDBACK_COUNT_MAX) {
+ /* switch to another client */
+ if (verbose >=2)
+ fprintf(stderr,
+ "Consecutive non_feedback exceeds limit, Changing monitor machine.\n");
+ /* if (nTargets>1 && (nTargets - nBadMachines()) > 1 && nNotRecv() > 1) */
+ check_change_monitor(monitorID); /* replace the current monitor */
+ no_feedback_count = 0;
+ }
+ return;
+ } else { /* resp == 1 */
+ end_timer();
+ update_time_accumulator();
+ rtt = get_accumulated_usec();
+ /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */
+ /* to do: update histogram */
+ if (verbose >=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt);
+ update_rtt_hist(rtt);
+
+ no_feedback_count = 0;
+ }
+}
+
+void send_cmd_and_wait_ack(int cmd_code)
+{
+ send_cmd(cmd_code, (int) ALL_MACHINES);
+ refresh_machine_status();
+ /*set_delay(0, FAST);*/
+ set_delay(0, DT_PERPAGE*FACTOR);
+ if (cmd_code==EOF_CMD) mod_machine_status();
+ wait_for_ok(cmd_code);
+ do_badMachines_exit();
+ /* check_change_monitor(-1); */
+}
+
+int do_file_changed_skip()
+{
+ /* if file is changed during syncing, then we should skip this file */
+ if (file_changed || !same_stat_for_file()) {
+ fprintf(stderr, "WARNING: file is changed during sycing -- skipping\n");
+ send_cmd_and_wait_ack(CLOSE_ABORT_CMD);
+ free_missing_page_flag();
+ ++skip_count;
+ return TRUE;
+ }
+ return FALSE;
+}
+
+int main(int argc, char *argv[])
+{
+ int c;
+ int cfile, ctotal_pages, cpage;
+ char * source_path = NULL;
+ char * synclist_path = NULL;
+ char * machine_list_file = NULL;
+ time_t tloc;
+ time_t time0, time1, t_page0, t_page;
+
+ while ((c = getopt(argc, argv, "v:w:A:P:T:LI:m:s:f:br:d:Xq")) != EOF) {
+ switch (c) {
+ case 'v':
+ verbose = atoi(optarg);
+ break;
+ case 'w':
+ my_ACK_WAIT_TIMES = atoi(optarg);
+ break;
+ case 'A':
+ my_MCAST_ADDR = optarg;
+ break;
+ case 'P':
+ my_PORT = atoi(optarg);
+ my_FLOW_PORT = my_PORT -1;
+ break;
+ case 'T':
+ my_TTL = atoi(optarg);
+ break;
+ case 'L':
+ my_LOOP = TRUE;
+ break;
+ case 'I':
+ my_IFname = optarg;
+ break;
+ case 'm':
+ machine_list_file = optarg;
+ break;
+ case 's':
+ source_path = optarg;
+ break;
+ case 'f':
+ synclist_path = optarg;
+ break;
+ case 'b':
+ backup = TRUE; /* if nPattern==0, backup means back up ALL files */
+ break;
+ case 'r': /* to selectively back up certain files as defined in the pattern */
+ if (!read_backup_pattern(optarg)) {
+ fprintf(stderr, "Failed in loading regex patterns in file = %s\n", optarg);
+ exit(BAD_EXIT);
+ }
+ break;
+ case 'd':
+ pattern_baseDir = strdup(optarg);
+ if (pattern_baseDir[strlen(pattern_baseDir)-1]=='/')
+ pattern_baseDir[strlen(pattern_baseDir)-1] = '\0' ; /* remove last / */
+ break;
+ case 'X':
+ toRmFirst = TRUE;
+ break;
+ case 'q':
+ quitWithOneBad = TRUE;
+ break;
+ case '?':
+ usage();
+ exit(BAD_EXIT);
+ }
+ }
+
+ if (!machine_list_file || !source_path || !synclist_path ) {
+ fprintf(stderr, "Essential options (-m -s -f) should be specified. \n");
+ usage();
+ exit(BAD_EXIT);
+ }
+
+ if (nPattern>0) backup = TRUE;
+ if (backup && nPattern>0) {
+ if (!pattern_baseDir) pattern_baseDir = strdup(source_path);
+ if (strlen(source_path) < strlen(pattern_baseDir) ||
+ strncmp(source_path, pattern_baseDir, strlen(pattern_baseDir))!=0) {
+ fprintf(stderr,
+ "src_path (%s) should include (and be longer than) pattern_baseDir (%s)",
+ source_path, pattern_baseDir);
+ exit(BAD_EXIT);
+ }
+ }
+
+ if (backup && toRmFirst) {
+ fprintf(stderr, "-B and -X cannot co-exist\n");
+ exit(BAD_EXIT);
+ }
+
+ get_machine_names(machine_list_file);
+ if (nTargets==0) {
+ fprintf(stderr, "No target to sync to\n");
+ exit(GOOD_EXIT);
+ }
+
+ if (!init_synclist(synclist_path, source_path)) exit(BAD_EXIT);
+
+ if (total_entries()==0) {
+ fprintf(stderr, "Nothing to sync in %s\n", synclist_path);
+ exit(GOOD_EXIT);
+ }
+
+ if (verbose >= 2)
+ fprintf(stderr, "Total number of files: %d\n", total_entries());
+
+ /* init the network stuff and some flags */
+ init_sends();
+ init_complaints();
+ init_machine_status(nTargets);
+
+ /* set up Cntl_C catcher */
+ Signal(SIGINT, do_cntl_c);
+
+ /* ------------------- set up monitor machine for doing feedback for each page sent */
+ check_change_monitor(-1);
+
+ /*-------------------------------------------------------------------------------*/
+
+ init_rtt_hist();
+ time0 = time(&tloc); /* start time */
+ t_page = 0; /* total time for sending pages */
+
+ /* -----------------------------Send the file one by one -----------------------------------*/
+ for (cfile = 1; cfile <= total_entries(); cfile++) { /* for each file to be synced */
+ if (!get_next_entry(cfile)) continue;
+
+ ctotal_pages = pages_for_file();
+
+ /*
+ By the time this file, which was obtained when synclist was
+ established some time ago, may no longer exist on the master.
+ So, we need to check the existence of this file.
+ fexist() also opens the file so that it won't be deleted
+ between here and the send-page-loop.
+ */
+ if (ctotal_pages > 0 && (!same_stat_for_file() ||
+ !fexist(current_entry()))) {
+ /* go to next file if this file has changed or does not exist */
+ fprintf(stderr, "%s (%d out of %d; Extinct file)\n",
+ getFilename(), current_entry(), total_entries());
+ adjust_totals();
+ continue;
+ }
+
+ if (ctotal_pages < 0) {
+ fprintf(stderr, "%s (%d out of %d; to delete)\n",
+ getFilename(), current_entry(), total_entries());
+ } else {
+ fprintf(stderr, "%s (%d out of %d; %d pages)\n",
+ getFilename(), current_entry(), total_entries(), ctotal_pages);
+ }
+
+ /* send_open_cmd */
+ pack_open_file_info();
+ send_cmd_and_wait_ack(OPEN_FILE_CMD);
+
+ /*
+ ctotal_pages < 0, for deletion
+ ctotal_pages = 0, regular file with no content.
+ or directory, softlink, hardlink
+ both should have been finished with OPEN_FILE_CMD
+ */
+ if (ctotal_pages <= 0) continue;
+
+ /* for other regular files */
+ init_missing_page_flag(ctotal_pages);
+ refresh_missing_pages(); /* total missing pages for this file for each tar */
+
+ /* ----- sending file data ----- first round */
+ t_page0 = time(&tloc);
+ no_feedback_count = 0;
+ for (cpage = 1; cpage <= ctotal_pages; cpage++) {
+ /*
+ the mode field and delay may be changed by change_monitor
+ */
+ set_delay(0, DT_PERPAGE*FACTOR);
+ set_mode(SENDING_DATA);
+ do_one_page(cpage);
+ }
+
+ if (do_file_changed_skip()) continue;
+
+ /* send "I am done with the first round" */
+ reset_has_missing();
+ refresh_file_received(); /* to record machines that have received this file */
+ send_cmd_and_wait_ack(EOF_CMD);
+
+ /* after the first run, before we go to 2nd and 3rd run, */
+ if (has_missing_pages()) check_change_monitor(-1);
+
+ /* ----- sending file data again, 2nd and 3rd and ...n-th round */
+ reset_has_sick();
+ while (has_missing_pages()) {
+ int c; /****************/
+ no_feedback_count = 0;
+
+ c = 0;
+ for (cpage = 1; cpage <= ctotal_pages; cpage++){
+ if (is_it_missing(cpage-1)) {
+ set_delay(0, DT_PERPAGE*FACTOR);
+ set_mode(RESENDING_DATA);
+ do_one_page(cpage);
+ page_sent(cpage-1);
+ ++c; /*************/
+ }
+ }
+ if (verbose>=1)
+ fprintf(stderr, "re-sent N_pages = %d\n", c); /*************/
+
+ /* eof */
+ reset_has_missing();
+ send_cmd_and_wait_ack(EOF_CMD);
+ if (has_sick_machines()) {
+ break;
+ /* one machine can reach sick_state while some others are still
+ in missing_page state.
+ This break here is ok in terms of skipping this file.*/
+ } else {
+ check_change_monitor(-1);
+ }
+ };
+
+ t_page += (time(&tloc) - t_page0);;
+ if (do_file_changed_skip()) continue;
+
+ /* close file */
+ send_cmd_and_wait_ack((has_sick_machines()) ? CLOSE_ABORT_CMD : CLOSE_FILE_CMD);
+ if (has_sick_machines()) {
+ fprintf(stderr, "Skip_syncing %s\n", getFilename());
+ }
+ free_missing_page_flag();
+ } /* end of the for each_file loop */
+
+ time1= time(&tloc);
+ return send_done_and_pr_msgs( ((double)(time1 - time0))/ 60.0, ((double)t_page)/60.0);
+}
+
diff --git a/multicatcher.c b/multicatcher.c
new file mode 100644
index 0000000..76fbc5e
--- /dev/null
+++ b/multicatcher.c
@@ -0,0 +1,181 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ verision 3.0 major update
+ -- large file support
+ -- platform independence (between linux, unix)
+ -- backup feature (as in rsync)
+ -- removing meta-file-info
+ -- catching slow machine as the feedback monitor
+ -- mcast options
+ version 3.0.[1-9] bug fixes
+ -- logic flaw which under certain condition
+ caused premature dropout due to
+ unsuccessful EOF, CLOSE_FILE
+ and caused unwanranted SIT-OUT cases.
+ -- tested on Debian 64 bit arch by Nicolas Marot in France
+ version 3.1.0
+ -- codes for IPv6 are ready (but not tested)
+ IPv4 is tested ok.
+ version 3.2.0
+ -- monitor change improvement
+ -- handshake improvement (e.g. seq #)
+ -- if one machine skips a file, all will NOT close()
+
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+
+extern int machineID;
+extern int verbose;
+extern int isMonitor;
+extern char * baseDir;
+extern char * cmd_name[];
+extern char * backup_suffix;
+
+char * my_MCAST_ADDR = MCAST_ADDR;
+char * my_IFname = MCAST_IF;
+int my_FLOW_PORT = FLOW_PORT;
+int my_PORT = PORT;
+
+void usage()
+{
+ fprintf(stderr,
+ "multicatcher (to receive files from multicaster) version %s\n"
+ " Option list:\n"
+ " [ -v <verbose_level 0-2> ]\n"
+ " -------- essential options ----------------------------------\n"
+ " -t <destination Dirpath>\n"
+ " -i <machineID 0 originated id numbers>\n"
+ " -------- options for backup ---------------------------------\n"
+ " [ -u <suffix> for backup files if -b is on in multicaster ]\n"
+ " -------- mcast options --------------------------------------\n"
+ " [ -A <my_mcast_address default=%s)> **same as for multicaster ]\n"
+ " [ -P <my_PORT default=%d> **same as for multicaster ]\n"
+ " [ -I <my_MCAST_IF default=NULL> ]\n",
+ VERSION, MCAST_ADDR, PORT);
+}
+
+int main(int argc, char *argv[])
+{
+ int old_mode; /* hp: from char to int for mode */
+ int mode;
+ int c;
+
+ while ((c = getopt(argc, argv, "v:A:P:t:i:u:I:")) != EOF) {
+ switch (c) {
+ case 'v':
+ verbose = atoi(optarg);
+ break;
+ case 'A':
+ my_MCAST_ADDR = optarg;
+ break;
+ case 'P':
+ my_PORT = atoi(optarg);
+ my_FLOW_PORT = my_PORT -1;
+ break;
+ case 'I':
+ my_IFname = optarg;
+ break;
+ case 't':
+ baseDir = optarg;
+ break;
+ case 'i':
+ machineID = atoi(optarg);
+ break;
+ case 'u':
+ backup_suffix = strdup(optarg);
+ break;
+ case '?':
+ usage();
+ exit(BAD_EXIT);
+ }
+ }
+
+ if (machineID < 0 || !baseDir) {
+ fprintf(stderr, "Essential options (-t -i) should be specified. \n");
+ usage();
+ exit(BAD_EXIT);
+ }
+
+ fprintf(stderr, "my_pid= %lu\n", getpid());
+
+ if (!backup_suffix) default_suffix();
+ get_tmp_suffix(); /* get a unique tmp_name for the tmp file */
+
+ init_page_reader();
+ init_complaint_sender();
+
+ /* initialize random numbers */
+ srand(time(NULL) + getpid());
+
+ /* set the timeout for readable() to be about 3 to 6 seconds
+ Actually, this setting is arbitrary.
+ The timeout of readable() does not play a role in
+ the logic flow.
+ */
+ set_delay( 3 + rand() % 6, 0);
+ mode = old_mode = TEST;
+
+ /* -----------------------The main loop---------------------------
+ Multicatcher simply waits for any incoming UDP,
+ reads and handles it.
+ If the UDP contains file content, it is placed in the right place.
+ If the UDP contains an instruction, it is carried out.
+
+ Multicatcher never complains unless being told so.
+ For example, as it is now, multicatcher does not complain
+ about the rate of incoming UDP being too fast to handle.
+ If multicatcher cannot keep up with the speed, it just
+ loses certain pages in a file which will be reported
+ later when multicaster requests acknowledgement.
+ ---------------------------------------------------------------- */
+ while(1) { /* loop for all incoming pages */
+ if (verbose>=2)
+ fprintf(stderr, "Starting listen loop with mode %d, old_mode = %d\n", mode, old_mode);
+
+ /* the major task here */
+ mode = read_handle_page();
+ if (verbose>=2) fprintf(stderr, "new page in mode %d\n", mode);
+
+ if (mode == ALL_DONE_CMD) break;
+
+ /* for debugging purpose */
+ if ((old_mode != mode)) {
+ if (verbose>=2 && mode <= 5) fprintf(stderr, "%s\n", cmd_name[mode]);
+ }
+
+ /* got no data? */
+ if (mode == TIMED_OUT) {
+ if (verbose>=2) fprintf(stderr, "*");
+ }
+
+ old_mode = mode;
+ } /* end of incoming page loop */
+
+ if (verbose>=1) fprintf(stderr, "Done!\n");
+ return 0;
+}
diff --git a/page_reader.c b/page_reader.c
new file mode 100644
index 0000000..8f349a0
--- /dev/null
+++ b/page_reader.c
@@ -0,0 +1,426 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+
+/* the following is needed on Sun but not on linux */
+#ifdef _SUN
+#include <sys/filio.h>
+#endif
+
+extern int machineID;
+extern int verbose;
+extern char * my_MCAST_ADDR; /* defined in multicatcher.c */
+extern char * my_IFname;
+extern int my_PORT;
+extern unsigned int current_file_id;
+
+int isMonitor; /* flag = if this target machine is a designated monitor */
+int nPage_recv; /* counter for the number of pages received for a file */
+int machineState; /* there are four states during one file transmission */
+int isFirstPage=TRUE; /* flag */
+
+/* The followings are used to determine sick condition */
+int current_missing_pages;
+int last_missing_pages;
+int sick_count;
+
+/* receive socket */
+int recfd;
+#ifndef IPV6
+struct sockaddr_in rec_addr;
+#else
+struct sockaddr_in6 rec_addr;
+#endif
+
+/*
+ Receive buffer for storing the data obtained from UDP
+ The format:
+ (5*sizeof(int) bytes header) + (PAGE_SIZE data area)
+
+ The header has five int_type (4 bytes) int's.
+ (1) mode -- for master to give instructions to the target machines.
+ (2) current file index (starting with 1)
+ (3) current page index (starting with 1)
+ (4) bytes that has been sent in this UDP page
+ (5) total number of pages.
+
+ data_ptr points to the data area.
+*/
+int *mode_ptr; /* hp: change from char to int */
+int *total_pages_ptr;
+int *current_page_ptr;
+int *bytes_sent_ptr, *current_file_ptr;
+char *data_ptr;
+char rec_buf[PAGE_BUFFSIZE];
+
+void init_page_reader()
+{
+ /*struct ip_mreq mreq;*/
+ int rcv_size;
+
+ machineState = IDLE_STATE;
+ isMonitor = FALSE;
+
+ /* Prepare buffer pointers */
+ mode_ptr = (int*)rec_buf;
+ current_file_ptr = (int *)(mode_ptr + 1);
+ current_page_ptr = (int *)(current_file_ptr + 1);
+ bytes_sent_ptr = (int *)(current_page_ptr + 1);
+ total_pages_ptr = (int *)(bytes_sent_ptr + 1);
+ data_ptr = (char *)(total_pages_ptr + 1);
+
+ /* Set up receive socket */
+ if (verbose>=2) fprintf(stderr, "setting up receive socket\n");
+ recfd = rec_socket(&rec_addr, my_PORT);
+
+ /* Join the multicast group */
+ /*inet_pton(AF_INET, MCAST_ADDR, &(mreq.imr_multiaddr.s_addr));
+ mreq.imr_multiaddr.s_addr = inet_addr(my_MCAST_ADDR);
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+
+ if (setsockopt(recfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void*)&mreq, sizeof(mreq)) < 0){
+ perror("Joining Multicast Group");
+ }
+ */
+
+ if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) {
+ perror("Joining Multicast Group");
+ }
+
+ /* Increase socket receive buffer */
+ rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE;
+ if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){
+ perror("Expanding receive buffer for page_reader");
+ }
+
+}
+
+/*
+ This is the heart of multicatcher.
+ It parses the incoming pages and do proper reactions according
+ to the mode (command code) encoded in the first 4 bytes in an UDP page.
+ It returns the mode.
+*/
+int read_handle_page()
+{
+ #ifndef IPV6
+ struct sockaddr_in return_addr;
+ #else
+ struct sockaddr_in6 return_addr;
+ #endif
+ int bytes_read;
+ socklen_t return_len = (socklen_t)sizeof(return_addr);
+ int mode_v, bytes_sent_v, total_pages_v, current_file_v, current_page_v;
+
+ /* -----------receiving data -----------------*/
+ if (readable(recfd) == 1) { /* there is data coming in */
+ /* check_queue(); This might be useful. But need more study. */
+ /* get data */
+ bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0,
+ (struct sockaddr *)&return_addr,
+ (socklen_t*) &return_len);
+
+ bytes_sent_v = ntohl(*bytes_sent_ptr);
+ if (bytes_read != (bytes_sent_v + HEAD_SIZE))
+ return NULL_CMD;
+
+ /* convert from network byte order to host byte order */
+ mode_v = ntohl(*mode_ptr);
+ total_pages_v = ntohl(*total_pages_ptr);
+ current_file_v = ntohl(*current_file_ptr);
+ current_page_v = ntohl(*current_page_ptr);
+
+ if (isFirstPage) {
+ update_complaint_address(&return_addr);
+ isFirstPage = FALSE;
+ }
+
+ /* --- process various commands (modes) */
+ switch (mode_v) {
+ case TEST:
+ /* It is just a test packet? */
+ fprintf(stderr, "********** Received test packet **********\n");
+ return mode_v;
+
+ case SELECT_MONITOR_CMD:
+ if (current_page_v == machineID) {
+ isMonitor = TRUE;
+ send_complaint(MONITOR_OK, machineID, 0, 0);
+ } else {
+ isMonitor = FALSE;
+ }
+ return mode_v;
+
+ case OPEN_FILE_CMD:
+ if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)
+ && machineState == IDLE_STATE) {
+ /* get info about this file */
+ if (verbose>=1)
+ fprintf(stderr, "open file id= %d\n", current_file_v);
+ if (!extract_file_info(data_ptr, current_file_v, total_pages_v))
+ return mode_v;
+
+ /* different tasks here */
+ /* open file, rmdir, unlink */
+ if (total_pages_v < 0) { /* delete a file or a directory */
+ if (!delete_file(TRUE)) return mode_v; /* this fx can be re-entered many times */
+ /* machineState remains to be IDLE_STATE */
+ } else if (total_pages_v == 0) {
+ /* handle an empty file, or (soft)link or directory */
+ if (!check_zero_page_entry()) return mode_v; /* can re-enter many times */
+ /* machineState remains to be IDLE_STATE */
+ } else { /* a regular file */
+ if (!open_file()) return mode_v;
+ /* the file has been opened. */
+ sick_count = 0;
+ current_missing_pages =0;
+ last_missing_pages = nPages_for_file(current_file_v);
+ machineState = GET_DATA_STATE;
+ }
+ send_complaint(OPEN_OK, machineID, 0, current_file_id); /* ack */
+ nPage_recv = 0;
+ return mode_v;
+ }
+ /*
+ We must be in GET_DATA_STATE.
+ OPEN_OK ack has been sent back in the previous block.
+ However,
+ the master may not have received the ack.
+ In that case the master will send back the open_file_cmd again.
+ */
+ if ((current_page_v == (int) ALL_MACHINES ||current_page_v == (int) machineID)
+ && (machineState == GET_DATA_STATE)) { /****/
+ send_complaint(OPEN_OK, machineID, 0, current_file_id);
+ }
+ return mode_v;
+
+ case EOF_CMD:
+ /**********/
+ if (verbose>=1)
+ fprintf(stderr, "***** EOF received for id=%d state=%d id=%d, file=%d\n",
+ current_page_v, machineState, machineID, current_file_v);
+
+ /* the following happnens when this machine was previously out-of-pace
+ and was labeled as 'BAD MACHINE' by the master.
+ The master has proceeded with the syncing process without
+ waiting for this machine to finish the process in one of the previous files.
+ Since under normal condition, this machine should not expect to see
+ current_file changes except when OPEN_FILE_CMD is received.
+ */
+ if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
+
+ /* normal condition */
+ if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)
+ && machineState == GET_DATA_STATE) { /* GET_DATA_STATE */
+ /* check missing pages and send back missing-page-request */
+ current_missing_pages = ask_for_missing_page(); /* = total # of missing_pages */
+
+ if (current_page_v == (int) machineID) {
+ /* master is asking for my EOF_ack */
+ if (current_missing_pages == 0) {
+ /* w/o assuming how we get to this point ... */
+ machineState = DATA_READY_STATE;
+ send_complaint(EOF_OK, machineID, 0, current_file_id);
+ } else {
+ send_complaint(MISSING_TOTAL, machineID,
+ current_missing_pages , current_file_id);
+ missing_page_stat(); /* this has to be done before close_file() ******************/
+ }
+ return mode_v;
+ }
+
+ /***
+ master is asking everyone, (after master sent or re-sent pages)
+ so, we do some book-keeping procedures (incl state change)
+ ***/
+ if (verbose >=1)
+ fprintf(stderr, "missing_pages = %d, nPages_received = %d file = %d\n",
+ current_missing_pages, nPage_recv, current_file_v); /************/
+
+ nPage_recv = 0;
+
+ if (current_missing_pages == 0) {
+ /*
+ There is no missing page.
+ Change the state.
+ */
+ machineState = DATA_READY_STATE;
+ send_complaint(EOF_OK, machineID, 0, current_file_id);
+ return mode_v;
+ } else {
+ /*
+ There are missing pages.
+ If we still miss many pages for SICK_THRESHOLD consecutive times,
+ then we are sick. e.g. machine CPU does not give multicatcher
+ enough time to process incoming UDP's OR the disk I/O is too slow.
+ */
+
+ if ((SICK_RATIO)*(double)last_missing_pages < (double)current_missing_pages) {
+ ++sick_count;
+ if (sick_count > SICK_THRESHOLD) {
+ machineState = SICK_STATE;
+ send_complaint(SIT_OUT, machineID,
+ 0, current_file_id); /* no more attempt to receive */
+ /* master may send more pages from requests from other machines
+ but this machine will mark this file as 'sits out receiving' */
+ } else {
+ /* not sick enough yet */
+ send_complaint(MISSING_TOTAL, machineID,
+ current_missing_pages, current_file_id);
+ missing_page_stat(); /* this has to be done before close_file() ******************/
+ /* master will send more pages */
+ }
+ } else { /* we are getting enough missing pages this time to keep up */
+ sick_count = 0; /* break the consecutiveness */
+ send_complaint(MISSING_TOTAL, machineID,
+ current_missing_pages, current_file_id);
+ missing_page_stat(); /* this has to be done before close_file() ******************/
+ /* master will send more pages */
+ }
+ last_missing_pages = current_missing_pages;
+ return mode_v;
+ }
+ } /* end GET_DATA_STATE */
+
+ /* After state change, we still get request for ack.
+ send back ack again */
+ if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)) {
+ switch (machineState) {
+ case DATA_READY_STATE:
+ send_complaint(EOF_OK, machineID,
+ 0, current_file_id);
+ return mode_v;
+ case SICK_STATE:
+ send_complaint(SIT_OUT, machineID,
+ 0, current_file_id); /* just an ack, even for sick state*/
+ return mode_v;
+ }
+ }
+ return mode_v;
+
+ case CLOSE_FILE_CMD:
+ if (verbose>=1)
+ fprintf(stderr, "***** CLOSE received for id=%d state=%d id=%d, file=%d\n",
+ current_page_v, machineState, machineID, current_file_v);
+
+ if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
+
+ if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) {
+ if (machineState == DATA_READY_STATE) {
+ if (!close_file()) { return mode_v; };
+ set_owner_perm_times();
+ machineState = IDLE_STATE;
+ send_complaint(CLOSE_OK, machineID, 0, current_file_id);
+ return mode_v;
+ } else if (machineState == IDLE_STATE) {
+ /* send ack back again because we are asked */
+ send_complaint(CLOSE_OK, machineID, 0, current_file_id);
+ return mode_v;
+ } else { /* other states -- we should not be here*/
+ /* if (machineState == SICK_STATE || machineState == GET_DATA_STATE) */
+ /* SICK_STATE --> we are too slow in getting missing pages
+ if one of the machines is sick, master will send out CLOSE_ABORT
+ GET_DATA_STATE -->
+ We are not supposed to be in GET_DATA_STATE,
+ so consider it a sick_state */
+ fprintf(stderr, "*** should not be here -- state=%d\n", machineState);
+ if (!rm_tmp_file()) { return mode_v; };
+ machineState = IDLE_STATE;
+ send_complaint(SIT_OUT, machineID, 0, current_file_id);
+ /* make sick_count larger than threshold for GET_DATA_STATE */
+ sick_count = SICK_THRESHOLD + 10000;
+ return mode_v;
+ }
+ }
+ return mode_v;
+
+ case CLOSE_ABORT_CMD:
+ if (verbose>=1)
+ fprintf(stderr, "***** CLOSE_ABORT received for id=%d state=%d id=%d, file=%d\n",
+ current_page_v, machineState, machineID, current_file_v);
+
+ if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
+
+ if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) {
+ if (!rm_tmp_file()) { return mode_v; };
+ machineState = IDLE_STATE;
+ send_complaint(CLOSE_OK, machineID, 0, current_file_id);
+ }
+ return mode_v;
+
+ case SENDING_DATA:
+ case RESENDING_DATA:
+ if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
+ /*
+ otherwise, go ahead...
+ */
+ if (verbose>=2) {
+ fprintf(stderr, "Got %d bytes from page %d of %d for file %d mode=%d\n",
+ bytes_read - HEAD_SIZE,
+ current_page_v, total_pages_v,
+ current_file_v + 1, mode_v);
+ }
+
+ /* timing the disk IO */
+ /* start_timer(); */
+
+ write_page(current_page_v, data_ptr, bytes_read - HEAD_SIZE);
+ if (isMonitor) send_complaint(PAGE_RECV, machineID, 0, current_file_id);
+
+ /*
+ end_timer();
+ update_time_accumulator();
+ */
+
+ /* Yes, we have just read a page */
+ ++nPage_recv;
+ return mode_v;
+
+ case ALL_DONE_CMD:
+ /*
+ clear up the files.
+ */
+ /*** since we do not know if there are other machines
+ that are NOT in data_ready_state,
+ to maintain equality, it is best to just
+ remove tmp_file without close_file() ***/
+ rm_tmp_file();
+ return mode_v;
+
+ default:
+ return mode_v;
+ } /* end of switch */
+ } else {
+ /* No, the read is timed out */
+ return TIMED_OUT;
+ }
+}
+
diff --git a/parse_synclist.c b/parse_synclist.c
new file mode 100644
index 0000000..0c2a7aa
--- /dev/null
+++ b/parse_synclist.c
@@ -0,0 +1,320 @@
+/*
+ Copyright (C) 2006-2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <stdlib.h>
+#include <limits.h> /* to define PATH_MAX */
+
+/*
+ Get the next to-be-synced file from synclist which is the output of
+ parseRsyncList.C
+ The latter in turn parses the output from rsync.
+ In other words, we use rsync in dry-run mode to get the
+ files that need to be synced.
+
+ to port to large_file environment: use off_t for size
+*/
+
+extern int verbose;
+
+char basedir[PATH_MAX]; /* baseDir for the syncing */
+FILE *fd;
+struct stat st; /* stat for the current file */
+
+unsigned int nEntries;
+int cur_entry; /* file id -- <0 if it needs backup */
+unsigned int nPages;
+unsigned int last_bytes; /* number of bytes for the last page */
+int toRmFirst = FALSE; /* flag rm existing file and then sync */
+int file_changed = FALSE; /* flag to indicate if file has been changed during syncing */
+
+unsigned int total_pages;
+off_t total_bytes;
+
+int isDelete, isHardlink;
+char filename[PATH_MAX];
+char fullname[PATH_MAX];
+char linktar[PATH_MAX];
+
+int init_synclist(char * synclist_path, char *bdir)
+{
+ char line[PATH_MAX];
+ nEntries = 0;
+ strcpy(basedir, bdir);
+
+ if ((fd = fopen(synclist_path, "r")) == NULL) {
+ fprintf(stderr, "Cannot open synclist file = %s \n", synclist_path);
+ return FAIL;
+ }
+
+ while (fgets(line, PATH_MAX, fd) != NULL) nEntries++;
+ if (nEntries == 0) {
+ fclose(fd);
+ fprintf(stderr, "Empty entires in synclist file = %s\n", synclist_path);
+ return SUCCESS; /* OK, nothing to sync */
+ }
+ rewind(fd);
+
+ cur_entry = 0;
+ total_pages = 0;
+ total_bytes = 0;
+
+ return SUCCESS;
+}
+
+/*
+ pages_for_file calculates the number (int) of pages for the current file
+ and returns that number in an (int) type.
+ So, max_pages = 2**31 = 2147483648
+ which corresponds to a file_size of 2**31 * 64512 = 1.38e14
+ [ general limit = (1<<(sizeof(int)*8)) * PAGE_SIZE ]
+ At that time, the type of page_number needs to be upgraded :)
+
+ to_delete -> -1
+ normal_file -> number_of_pages
+ softlink -> 0
+ hardlink -> 0
+ directory -> 0
+*/
+int pages_for_file()
+{
+ if (isDelete) { /* to be deleted directory or file */
+ return TO_DELETE;
+ }
+
+ if (S_ISREG(st.st_mode)){
+ int n;
+ if (st.st_nlink > 1) return 0; /* hardlink file */
+
+ n = (int)((st.st_size)/(off_t)PAGE_SIZE); /* regular file */
+ if ((st.st_size)%((off_t)PAGE_SIZE) == 0) {
+ last_bytes = (unsigned int)(PAGE_SIZE);
+ return n;
+ } else {
+ last_bytes = (unsigned int)(st.st_size - (off_t)n * (off_t)PAGE_SIZE);
+ return n+1;
+ }
+ /*return ((st.st_size)%((off_t)PAGE_SIZE) == 0) ? n : n+1 ;*/
+ }
+ if (S_ISLNK(st.st_mode)){
+ return 0; /* softlink */
+ }
+ return 0; /* directory */
+}
+
+off_t bytes_for_file()
+{
+ return st.st_size;
+}
+
+unsigned int get_nPages() /* for this file */
+{
+ return nPages;
+}
+
+void strip(char * str)
+{
+ /* remove trailing \n and spaces */
+ char *pc = &str[strlen(str)-1];
+ while (*pc == ' ' || *pc == '\n') {
+ *(pc--) = '\0';
+ }
+}
+
+int same_stat_for_file()
+{
+ /* check if current stat is same as that when get_next_entry is called */
+ struct stat st1;
+
+ if(lstat(fullname, &st1) < 0) {
+ if (verbose >=1) perror(fullname);
+ return FAIL;
+ }
+
+ if (st1.st_size != st.st_size || st1.st_mode != st.st_mode ||
+ st1.st_mtime != st.st_mtime) {
+ return FAIL; /* the file has changed */
+ }
+ return SUCCESS;
+
+}
+
+int is_hardlink_line(char * line)
+{
+ /* when line is in the form of
+ string1 string2
+ it can be either a filename (string1 string2)
+ or a hardlink string1 => string2
+ */
+ struct stat st;
+ char fn[PATH_MAX];
+ strcpy(fn, basedir);
+ strcat(fn, "/");
+ strcat(fn, line);
+
+ /* if the whole line is not a file, then we are dealing with hardlink case */
+ return (lstat(fn, &st) < 0);
+ /* cannot deal with the situation
+ str1 is a file
+ str2 is a hardlink to str1
+ str1 str2 is a file
+ AND if we need to sync
+ str1 and 'str1 str2' at the same time.
+ But this situation is very rare. */
+}
+
+int get_next_entry(int current_file_id)
+{
+ char *c;
+ char line[PATH_MAX];
+
+ /* inside this function, cur_entry is set to be positve to facilitate processing */
+ cur_entry = current_file_id; /* from main loop's index, starting with 1 */
+
+ isDelete = FALSE;
+ isHardlink = FALSE;
+
+ fgets(line, PATH_MAX, fd);
+ strip(line);
+
+ if (current_file_id == nEntries) {
+ fclose(fd); /* close the synclist file */
+ if (verbose>=2) fprintf(stderr, "no more entry in synclist.\n");
+ }
+
+ if (verbose>=2) {
+ fprintf(stderr, "Got current entry = %d (total= %d)\n", cur_entry, nEntries);
+ fprintf(stderr, "%s\n", line);
+ }
+
+ strcpy(fullname, basedir);
+ strcat(fullname, "/");
+ if (strncmp(line, "deleting ", 9)==0) {
+ isDelete = TRUE;
+ nPages = -1;
+ strcat(fullname, &line[9]);
+ strcpy(filename, &line[9]);
+ if (needBackup(fullname)) cur_entry = -cur_entry;
+ return SUCCESS;
+ } else if ((c = strchr(line, ' '))!=NULL && is_hardlink_line(line)) {
+ /* is it a hardlink -- two filenames separated by a space */
+ char fn[PATH_MAX];
+ isHardlink = TRUE;
+ strncpy(fn, line, (c - line));
+ fn[c-line] = '\0';
+ strcat(fullname, fn);
+ strcpy(filename, fn);
+ strcpy(linktar, c+1);
+ } else {
+ /* normal, single entry */
+ strcat(fullname, line);
+ strcpy(filename, line);
+ }
+
+ /* update stat */
+ if(lstat(fullname, &st) < 0) {
+ if (verbose >=1) perror(fullname);
+ return FAIL;
+ }
+
+ if (S_ISLNK(st.st_mode)) {
+ int linklen;
+ linklen = readlink(fullname, linktar, PATH_MAX);
+ /* readlink doesn't null-terminate the string */
+ *(linktar + linklen) = '\0';
+ } else if (st.st_nlink>1 && !isHardlink) {
+ /* this is the target file that others (hard)link to.
+ treat it like a normal file */
+ st.st_nlink = 1;
+ }
+
+ nPages = pages_for_file();
+ if (nPages > 0) { /* for regular files */
+ total_pages += nPages;
+ total_bytes += st.st_size;
+ }
+
+ if (needBackup(fullname)) cur_entry = -cur_entry;
+
+ file_changed = FALSE;
+
+ return SUCCESS;
+}
+
+void adjust_totals()
+{
+ if (nPages > 0) {
+ total_pages -= nPages;
+ total_bytes -= st.st_size;
+ }
+}
+
+/* some accessors */
+unsigned int total_entries() { return nEntries; }
+
+int current_entry() { return cur_entry; }
+
+int is_softlink() { return S_ISLNK(st.st_mode); }
+
+int is_hardlink() { return isHardlink; }
+
+int is_directory() { return S_ISDIR(st.st_mode); }
+
+char * getFilename() { /* relative to basedir */ return &filename[0]; }
+
+char * getFullname() { return &fullname[0]; }
+
+/*
+ The following three fx are used to fill file_info into the send_buffer.
+ They return the number of bytes being written into the buf, including the \0 byte.
+*/
+unsigned int fill_in_stat(char *buf)
+{
+ /* load into buf area the stat info in ascii format */
+ if (isDelete)
+ sprintf(buf, "0 0 0 0 0 0 0 0");
+ else {
+ #ifdef _LARGEFILE_SOURCE
+ sprintf(buf, "%u %u %u %u %llu %lu %lu %d", st.st_mode, st.st_nlink,
+ st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime,
+ toRmFirst);
+ #else
+ sprintf(buf, "%u %u %u %u %lu %lu %lu %d", st.st_mode, st.st_nlink,
+ st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime,
+ toRmFirst);
+ #endif
+ }
+
+ return strlen(buf)+1;
+}
+
+unsigned int fill_in_filename(char * buf)
+{
+ strcpy(buf, filename);
+ return strlen(buf)+1;
+}
+
+unsigned int fill_in_linktar(char *buf)
+{
+ strcpy(buf, linktar);
+ return strlen(buf)+1;
+}
+
+
diff --git a/proto.h b/proto.h
new file mode 100644
index 0000000..6569d44
--- /dev/null
+++ b/proto.h
@@ -0,0 +1,182 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __main_proto_h
+#define __main_proto_h
+
+/* parse_synclist.c */
+unsigned int total_entries();
+unsigned int fill_in_stat(char *buf);
+unsigned int fill_in_linktar(char *buf);
+unsigned int fill_in_filename(char * buf);
+unsigned int get_nPages();
+int pages_for_file();
+char * getFilename();
+char * getFullname();
+int same_stat_for_file();
+void strip(char * str);
+int current_entry();
+int get_next_entry(int current_file_id);
+int is_softlink();
+int is_directory();
+int is_hardlink();
+int init_synclist(char * synclist_path, char *bdir);
+void adjust_totals();
+
+/* backup.c */
+int read_backup_pattern(char * fpat_file);
+int needBackup(char * filename);
+
+/* sends.c */
+void init_sends();
+void set_mode(int new_mode);
+int send_page(int page);
+void send_test();
+void send_cmd(int code, int machine_id);
+void send_all_done_cmd();
+int fexist(int entry) ;
+void pack_open_file_info();
+void my_exit(int);
+
+/* complaints.c */
+void init_complaints();
+int read_handle_complaint(int cmd);
+void wait_for_ok(int code);
+void refresh_machine_status();
+void refresh_missing_pages();
+void mod_machine_status();
+void refresh_file_received();
+int nNotRecv();
+int iNotRecv();
+int is_it_missing(int page);
+int has_missing_pages();
+int has_sick_machines();
+void init_missing_page_flag(int n);
+void free_missing_page_flag();
+void refresh_machine_status();
+void init_machine_status(int n);
+void page_sent(int page);
+int nBadMachines();
+void do_badMachines_exit();
+int pr_missing_pages();
+int send_done_and_pr_msgs(double, double);
+void do_cntl_c(int signo);
+void set_has_missing();
+void reset_has_missing();
+void set_has_sick();
+void reset_has_sick();
+
+
+/* setup_socket.c */
+void set_delay(int secs, int usecs);
+void get_delay(int * secs, int * usecs);
+int readable(int fd);
+#ifndef IPV6
+int complaint_socket(struct sockaddr_in *addr, int port);
+int send_socket(struct sockaddr_in *addr, char * cp, int port);
+int rec_socket(struct sockaddr_in *addr, int port);
+#else
+int rec_socket(struct sockaddr_in6 *addr, int port);
+int send_socket(struct sockaddr_in6 *addr, char * cp, int port);
+int complaint_socket(struct sockaddr_in6 *addr, int port);
+#endif
+
+/* set_mcast.c */
+int mcast_set_if(int sockfd, const char *ifname, u_int ifindex);
+int mcast_set_loop(int sockfd, int onoff);
+int mcast_set_ttl(int sockfd, int val);
+
+/* set_catcher_mcast.c */
+int Mcast_join(int sockfd, const char *mcast_addr,
+ const char *ifname, u_int ifindex);
+void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr);
+
+/* complaint_sender.c */
+void fill_in_int(int i);
+void init_fill_ptr();
+void send_complaint(int complaint, int mid, int page, int file);
+void init_complaint_sender();
+#ifndef IPV6
+void update_complaint_address(struct sockaddr_in *sa);
+#else
+void update_complaint_address(struct sockaddr_in6 *sa);
+#endif
+
+/* page_reader.c */
+void init_page_reader();
+int check_queue();
+int read_handle_page();
+
+/* file_operations.c */
+void get_tmp_suffix();
+int extract_file_info(char * buf, int n_file, unsigned int n_pages);
+int open_file();
+int close_file();
+int rm_tmp_file();
+int delete_file(int to_check_dir_type);
+int touch_file();
+int nPages_for_file();
+int has_all_pages();
+int ask_for_missing_page();
+void missing_page_stat();
+void write_page(int page, char* data_ptr, int bytes);
+int is_missing(int page);
+void page_received(int page);
+int set_owner_perm_times();
+void close_last_file();
+int check_zero_page_entry();
+void default_suffix();
+
+/* timing */
+void refresh_timer();
+double get_accumulated_time();
+void start_timer();
+void end_timer();
+void update_time_accumulator();
+double get_accumulated_usec();
+void update_rtt_hist(unsigned int rtt);
+void pr_rtt_hist();
+void init_rtt_hist();
+unsigned int pages_wo_ack();
+
+/* signal.c */
+typedef void Sigfunc(int); /* for signal handlers */
+Sigfunc * Signal(int signo, Sigfunc *func);
+int Fcntl(int fd, int cmd, int arg);
+int Ioctl(int fd, int request, void *arg);
+void Sigemptyset(sigset_t *set);
+void Sigaddset(sigset_t *set, int signo);
+void Sigprocmask(int how, const sigset_t *set, sigset_t *oset);
+
+/* id_map.c */
+void get_machine_names(char * filename);
+char * id2name(int id);
+
+#endif
diff --git a/rtt.c b/rtt.c
new file mode 100644
index 0000000..71c8bd0
--- /dev/null
+++ b/rtt.c
@@ -0,0 +1,258 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+/*
+ To measure round trip time (RTT) using UDP
+*/
+
+#include "rttmain.h"
+#include <limits.h> /* to define PATH_MAX */
+#include <libgen.h>
+
+char * my_MCAST_ADDR = MCAST_ADDR;
+int my_FLOW_PORT = FLOW_PORT;
+int my_PORT = PORT;
+int my_TTL = MCAST_TTL;
+int my_LOOP = MCAST_LOOP;
+char * my_IFname = MCAST_IF;
+
+int no_feedback_count;
+char * machine = NULL;
+int remote_pid;
+char * reshell = REMOTE_SHELL;
+char catcher_path[PATH_MAX];
+
+void usage()
+{
+ fprintf(stderr,
+ "rtt (to measure rount trip time to a target version %s\n"
+ " Option list:\n"
+ " [ -v flag to turn on verbose]\n"
+ " [ -r <remote shell, default=%s> ]\n"
+ " [ -p <PATH for remote rttcatcher, default=%s> ]\n"
+ " -------- essential options ----------------------------------\n"
+ " -m <destination machine_name>\n"
+ " -n <num Of Pages>\n"
+ " -s <page_size in bytes; max=64512>\n"
+ " -------- mcast options --------------------------------------\n"
+ " [ -A <my_mcast_address default=%s)> ]\n"
+ " [ -P <my_PORT default=%d> ]\n"
+ " [ -T <my_TTL default=%d> ]\n"
+ " [ -L flag turn on mcast_LOOP. default is off ]\n"
+ " [ -I <my_MCAST_IF default=NULL> ]\n",
+ VERSION, reshell, catcher_path, my_MCAST_ADDR, my_PORT, my_TTL);
+}
+
+void get_dirname_of_rtt(char *dname, char *rtt_path)
+{
+ char path[PATH_MAX];
+ strcpy(path, rtt_path); /* dirname() will change its argument */
+ strcpy(dname, dirname(path));
+
+ if (strcmp(dname, ".")==0) {
+ strcpy(dname, getcwd(path, PATH_MAX));
+ }
+}
+
+void do_one_page(int page)
+{
+ unsigned long rtt;
+ refresh_timer();
+ start_timer();
+ send_page(page);
+ /* read_handle_complaint() waits n*interpage_interval at most */
+ if (read_handle_complaint()==0) { /* delay_sec for readable() is set by set_delay() */
+ /*
+ At this point, the readable() returns without getting a reply
+ from the target after n*DT_PERPAGE
+ This indicates that the page has likely been lost in the network.
+ */
+ if (verbose) fprintf(stderr, "no ack for page = %d\n", page);
+ ++no_feedback_count;
+ update_rtt_hist(999999); /* register this as rtt = infinite --- the last element in rtt_hist */
+ if (no_feedback_count>NO_FEEDBACK_COUNT_MAX) { /* count the consecutive no_feedback event */
+ /* switch to another client */
+ fprintf(stderr, "Consecutive non_feedback exceeds limit. Continue with next page...\n");
+ no_feedback_count = 0;
+ }
+ } else {
+ end_timer();
+ update_time_accumulator();
+ rtt = get_accumulated_usec();
+ /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */
+ /* to do: update histogram */
+ if (verbose>=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt);
+ update_rtt_hist(rtt);
+
+ no_feedback_count = 0;
+ }
+}
+
+int invoke_catcher(char * machine)
+{
+ FILE *ptr;
+ char buf[PATH_MAX];
+
+ /* invoke rttcatcher on remote machine */
+ fprintf(stderr, "using %s to invoke rttcatcher on %s\n", reshell, machine);
+
+ /* check if rsh (ssh) works */
+ sprintf(buf, "%s %s date", reshell, machine);
+ if (verbose) fprintf(stderr, "%s\n", buf);
+ if (system(buf)) {
+ fprintf(stderr, "cannot do rsh to the target machine = %s\n", machine);
+ exit(BAD_EXIT);
+ }
+
+ if (!my_IFname) {
+ sprintf(buf,
+ "%s %s '%s/rttcatcher -A %s -P %d < /dev/null 1>/dev/null 2>/dev/null & echo $!'",
+ reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT);
+ } else {
+ sprintf(buf,
+ "%s %s '%s/rttcatcher -A %s -P %d -I %s < /dev/null 1>/dev/null 2>/dev/null & echo $!'",
+ reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT, my_IFname);
+ }
+
+
+ fprintf(stderr, "%s\n", buf);
+ if ((ptr = popen(buf, "r")) == NULL) {
+ fprintf(stderr, "Failure to invoke rttcather\n");
+ exit(-1);
+ }
+ fgets(buf, PATH_MAX, ptr);
+ pclose(ptr);
+
+ return atoi(buf);
+}
+
+int main(int argc, char *argv[])
+{
+ int c;
+ int nPages =-1, pageSize=-1, ipage;
+
+ verbose = 0;
+ catcher_path[0] = '\0';
+
+ while ((c = getopt(argc, argv, "vm:s:n:r:p:A:P:T:LI:")) != EOF) {
+ switch (c) {
+ case 'v':
+ verbose = 1;
+ break;
+ case 'r':
+ reshell = optarg;
+ break;
+ case 'p':
+ strcpy(catcher_path, optarg);
+ break;
+ case 'A':
+ my_MCAST_ADDR = optarg;
+ break;
+ case 'P':
+ my_PORT = atoi(optarg);
+ my_FLOW_PORT = my_PORT -1;
+ break;
+ case 'T':
+ my_TTL = atoi(optarg);
+ break;
+ case 'L':
+ my_LOOP = TRUE;
+ break;
+ case 'I':
+ my_IFname = optarg;
+ break;
+ case 'n' :
+ nPages = atoi(optarg);
+ break;
+ case 'm':
+ machine = optarg;
+ break;
+ case 's':
+ pageSize = atoi(optarg);
+ if (pageSize>MAX_PAGE_SIZE) {
+ usage();
+ exit(-1);
+ }
+ break;
+ case '?':
+ usage();
+ exit(-1);
+ }
+ }
+
+ if (strlen(catcher_path)==0) {
+ get_dirname_of_rtt(catcher_path, argv[0]);
+ }
+
+
+ if (nPages == -1 || pageSize == -1 || machine == NULL) {
+ fprintf(stderr, "Essential options (-n -m -s) should be specified. \n");
+ usage();
+ exit(-1);
+ }
+
+ /* init */
+ init_sends(pageSize);
+ init_complaints();
+
+ /* set up Cntl_C catcher */
+ Signal(SIGINT, do_cntl_c);
+
+ remote_pid = invoke_catcher(machine);
+ fprintf(stderr, "remote pid = %d\n", remote_pid);
+
+ sleep(1);
+
+ /* -------------------Send data-------------------------------------- */
+
+ init_missing_page_flag(nPages);
+
+ send_cmd(START_CMD, nPages);
+ refresh_machine_status();
+ wait_for_ok(START_CMD);
+ do_badMachines_exit(machine, remote_pid);
+
+ fprintf(stderr, "Sending data...\n");
+ /* send pages */
+ set_mode(SENDING_DATA);
+ set_delay(0, DT_PERPAGE*FACTOR);
+
+ no_feedback_count = 0;
+ for (ipage = 0; ipage < nPages; ipage++){
+ do_one_page(ipage);
+ }
+
+ send_cmd(EOF_CMD, 0);
+ refresh_machine_status();
+ wait_for_ok(EOF_CMD);
+ do_badMachines_exit("", -1);
+
+ /* -----------------end of send data -------------------------------- */
+
+ free_missing_page_flag();
+ send_done_and_pr_msgs();
+ return 0;
+}
+
diff --git a/rttcatcher.c b/rttcatcher.c
new file mode 100644
index 0000000..1af74f1
--- /dev/null
+++ b/rttcatcher.c
@@ -0,0 +1,118 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Codes in this file are extracted and modified from multicatcher.c.
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+
+char * my_MCAST_ADDR = MCAST_ADDR;
+int my_FLOW_PORT = FLOW_PORT;
+int my_PORT = PORT;
+char * my_IFname = MCAST_IF;
+
+void usage()
+{
+ fprintf(stderr,
+ "rttcatcher (to receive pages on the target. version %s\n"
+ " Option list:\n"
+ " [ -v flag to turn on verbose]\n"
+ " -------- mcast options --------------------------------------\n"
+ " [ -A <my_mcast_address default=%s)> ]\n"
+ " [ -P <my_PORT default=%d> ]\n"
+ " [ -I <my_MCAST_IF default=NULL> ]\n",
+ VERSION, MCAST_ADDR, PORT);
+}
+
+int main(int argc, char *argv[])
+{
+ int old_mode; /* hp: from char to int for mode */
+ int mode;
+ int c;
+
+ verbose = 0;
+ while ((c = getopt(argc, argv, "vA:P:I:")) != EOF) {
+ switch (c) {
+ case 'v':
+ verbose = 1;
+ break;
+ case 'A':
+ my_MCAST_ADDR = optarg;
+ break;
+ case 'P':
+ my_PORT = atoi(optarg);
+ my_FLOW_PORT = my_PORT -1;
+ break;
+ case 'I':
+ my_IFname = optarg;
+ break;
+ case '?':
+ usage();
+ exit(-1);
+ }
+ }
+
+ init_page_reader();
+ init_complaint_sender();
+
+ /* initialize random numbers */
+ srand(time(NULL) + getpid());
+
+ /* Wait forever if necessary for first packet */
+ set_delay(0, -1);
+ mode = old_mode = TEST; /* hp: add mode */
+
+ while(1) { /* loop for all incoming pages */
+ if (verbose)
+ fprintf(stderr, "Starting listen loop with mode %d\n", mode);
+
+ mode = read_handle_page();
+ if (verbose) fprintf(stderr, "in mode %d\n", mode);
+
+ if (mode == ALL_DONE_CMD) break;
+
+ /* got no data? */
+ if (mode == TIMED_OUT) {
+ if (verbose) fprintf(stderr, "*");
+ continue;
+ } /* end if TIMED_OUT */
+
+ /* changing modes? */
+ if ((old_mode != SENDING_DATA) && (mode == SENDING_DATA)){
+ /* Taking data, wait at least 3 to 8 seconds */
+ set_delay( 3 + rand() % 8, 200);
+ if (verbose) fprintf(stderr, "Receiving data\n");
+ old_mode = mode;
+ continue;
+ }
+
+ /* all other modes */
+ old_mode = mode;
+
+ } /* end of incoming page loop */
+
+ if (verbose) fprintf(stderr, "Done!\n");
+ return 0;
+}
+
+
diff --git a/rttcomplaint_sender.c b/rttcomplaint_sender.c
new file mode 100644
index 0000000..20d7cda
--- /dev/null
+++ b/rttcomplaint_sender.c
@@ -0,0 +1,103 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Codes in this file are extracted and modified from complaint_sender.c
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+
+/* send socket */
+int complaint_fd;
+#ifndef IPV6
+struct sockaddr_in complaint_addr;
+#else
+struct sockaddr_in6 complaint_addr;
+#endif
+
+extern int my_FLOW_PORT;
+
+/* send buffer */
+char complaint_buffer[FLOW_BUFFSIZE];
+int *ccode_ptr; /* change from char to int -- mem alignment */
+int *cpage_ptr;
+
+/*----------------------------------------------------------
+ init_complaint_sender initializes the buffer to allow the
+ catcher to send complaints back to the sender.
+
+ ret_address of sender to whom we will complain
+ is determined when we receive the first UDP data
+ in read_handle_page() in page_reader.c
+ ----------------------------------------------------------*/
+void init_complaint_sender()
+{
+ if (verbose)
+ fprintf(stderr, "in init_complaint_sender\n");
+
+ /* init the send_socket */
+ complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT);
+
+ ccode_ptr = (int *) complaint_buffer;
+ cpage_ptr = (int *)(ccode_ptr + 1);
+}
+
+#ifndef IPV6
+void update_complaint_address(struct sockaddr_in *sa)
+{
+ sock_set_addr((struct sockaddr *) &complaint_addr,
+ sizeof(complaint_addr), (void*)&sa->sin_addr);
+}
+#else
+void update_complaint_address(struct sockaddr_in6 *sa)
+{
+ sock_set_addr((struct sockaddr *) &complaint_addr,
+ sizeof(complaint_addr), (void*)&sa->sin6_addr);
+}
+#endif
+
+/*------------------------------------------------------------------------
+ send_complaint fills the complaint buffer and send it through our socket
+ back to the sender
+
+ The major use is to tell master machine which page of which file
+ needs to be re-transmitted.
+ complaint -- the complain code defined in main.h
+ file -- the file index
+ page -- page index
+ ------------------------------------------------------------------------*/
+void send_complaint(int complaint, int page)
+{
+ /* fill in the complaint data */
+ /* 20060323 add converting to network byte-order before sending out */
+ *ccode_ptr = htonl(complaint);
+ *cpage_ptr = htonl(page);
+
+ /* send it */
+ if( sendto(complaint_fd, complaint_buffer, FLOW_BUFFSIZE, 0,
+ (const struct sockaddr *)&complaint_addr,
+ sizeof(complaint_addr)) < 0) {
+ perror("Sending complaint\n");
+ }
+ if (verbose)
+ printf("Sent complaint:code=%d page=%d\n", complaint, page);
+}
diff --git a/rttcomplaints.c b/rttcomplaints.c
new file mode 100644
index 0000000..4f9ed9b
--- /dev/null
+++ b/rttcomplaints.c
@@ -0,0 +1,270 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ codes in this file are extracted and modified from complaints.c
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+#include <sys/times.h>
+
+/* buffer for receiving complaints */
+
+char flow_buff[FLOW_BUFFSIZE];
+int *code_ptr; /* What's wrong? */
+int *page_ptr; /* Which page */
+
+/* receive socket */
+int complaint_fd;
+#ifndef IPV6
+struct sockaddr_in complaint_addr;
+#else
+struct sockaddr_in6 complaint_addr;
+#endif
+
+extern int my_FLOW_PORT;
+
+/* status */
+char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */
+int total_missing_page = 0;
+char machine_status = NOT_READY;
+int nMachines = 1;
+int nPages;
+char *machine_list_file;
+
+extern char * machine;
+extern int remote_pid;
+extern char * reshell;
+
+/*
+ init_complaints initializes our buffers to receive complaint information
+ from the catchers
+*/
+void init_complaints ()
+{
+ if (verbose)
+ fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE);
+
+ /* Buffer */
+ code_ptr = (int *)flow_buff;
+ page_ptr = (int *)(code_ptr + 1);
+
+ /* Receive socket */
+ if (verbose) printf("set up receive socket for complaints\n");
+ complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT);
+}
+
+void init_missing_page_flag(int n)
+{
+ int i;
+ nPages = n;
+ if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) {
+ fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n);
+ perror("error = ");
+ exit(-1);
+ }
+ for(i=0; i<nPages; ++i) {
+ missing_page_flag[i] = RECEIVED;
+ }
+}
+
+void page_sent(int page)
+{
+ missing_page_flag[page] = RECEIVED;
+}
+
+void free_missing_page_flag()
+{
+ free(missing_page_flag);
+ missing_page_flag = NULL;
+}
+
+
+void refresh_machine_status()
+{
+ machine_status = NOT_READY;
+}
+
+int get_total_missing_pages()
+{
+ return total_missing_page;
+}
+
+int read_handle_complaint()
+{
+ /*
+ return 1 for receiving complaint
+ return 0 for no complaint handled
+ */
+ int code_v, page_v, bytes_read;
+
+ if (readable(complaint_fd)) {
+
+ /* There is a complaint */
+ bytes_read = recvfrom(complaint_fd, flow_buff, FLOW_BUFFSIZE, 0, NULL, NULL);
+
+ /* 20060323 deal with big- vs little-endian issue
+ convert incoming integers into host representation */
+
+ if (bytes_read != FLOW_BUFFSIZE) return 0;
+
+ code_v = ntohl(*code_ptr);
+ page_v = ntohl(*page_ptr);
+
+ switch (code_v) {
+ case PAGE_RECV:
+ return 1;
+ case START_OK :
+ case EOF_OK :
+ machine_status = MACHINE_OK;
+ return 1;
+ case MISSING_PAGE :
+ if (page_v > nPages) return 1; /* *page_ptr = page # (1 origin) */
+ ++(total_missing_page);
+ missing_page_flag[(page_v)-1] = MISSING;
+ return 1;
+ case LAST_MISSING :
+ if (page_v > nPages) return 1;
+ ++(total_missing_page);
+ missing_page_flag[page_v-1] = MISSING;
+ machine_status = MACHINE_OK;
+ return 1;
+ default :
+ printf("Unknown complaint: %d\n", code_v);
+ return 0;
+ } /* end of switch */
+ } /* end of if(readable) */
+
+ return 0;
+}
+
+int all_machine_ok()
+{
+ return (machine_status == NOT_READY ) ? 0 : 1;
+}
+
+void wait_for_ok(int code)
+{
+ int i, count;
+ time_t tloc;
+ time_t rtime0, rtime1;
+
+ rtime0 = time(&tloc); /* reference time */
+
+ count = 0;
+ while (!all_machine_ok()) {
+ if (read_handle_complaint()==1) { /* if there is a complaint handled */
+ rtime0 = time(&tloc); /* reset the reference time */
+ continue;
+ }
+ /* no complaints handled */
+ rtime1 = time(&tloc); /* time since last complaints */
+ if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) {
+ ++count;
+ if (count < ACK_WAIT_TIMES) {
+ fprintf(stderr, "%d: resend cmd(%d) to machines:[ ", count, code);
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status == NOT_READY) {
+ fprintf(stderr, "%d ", i);
+ send_cmd(code, (int) i);
+ usleep(FAST);
+ }
+ }
+ fprintf(stderr, "]\n");
+ rtime0 = rtime1;
+ } else {
+ fprintf(stderr, "Time out for the 'bad' machines:[ ");
+ for(i=0; i<nMachines; ++i) {
+ if (machine_status== NOT_READY) {
+ fprintf(stderr, "%d ", i);
+ }
+ }
+ fprintf(stderr, "]\n");
+ break;
+ }
+ }
+ }
+}
+
+int is_it_missing(int page)
+{
+ return (missing_page_flag[page]==MISSING) ? 1 : 0;
+}
+
+int has_missing_pages()
+{
+ int i;
+ for(i=0; i<nPages; ++i) {
+ if (missing_page_flag[i] == MISSING) {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+void pr_missing_pages()
+{
+ int N;
+
+ N = get_total_missing_pages();
+ fprintf(stderr, "Missing pages = %10d (%5.2f%%) out of total pages = %d\n",
+ N, (double)N/((double)nPages)*100.0, nPages);
+}
+
+void send_done_and_pr_msgs()
+{
+ send_all_done_cmd();
+ pr_missing_pages();
+ pr_rtt_hist();
+}
+
+void kill_pid()
+{
+ char cmd[100];
+ /* kill remote process in case where remote machine is not in the multicast network */
+ sprintf(cmd, "%s %s 'kill -9 %d'", reshell, machine, remote_pid);
+ fprintf(stderr, "To make sure we clean up the process on remote machine,\n");
+ fprintf(stderr, "%s\n", cmd);
+ system(cmd);
+}
+
+void do_cntl_c(int signo)
+{
+ fprintf(stderr, "Control_C interrupt detected!\n");
+ send_done_and_pr_msgs();
+ kill_pid();
+ exit(-1);
+}
+
+/* to do some cleanup before exit IF all machines are bad */
+void do_badMachines_exit(char* machine, int pid)
+{
+ if (machine_status != NOT_READY) return;
+ fprintf(stderr, "Remote machine is bad. Exit!\n");
+ send_done_and_pr_msgs();
+
+ if (pid > 0) {
+ kill_pid();
+ }
+
+ exit(-1);
+}
diff --git a/rttmain.h b/rttmain.h
new file mode 100644
index 0000000..78eb693
--- /dev/null
+++ b/rttmain.h
@@ -0,0 +1,126 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __main_h
+#define __main_h
+
+#include <time.h>
+#include <utime.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
+#include <arpa/inet.h> /* inet(3) functions */
+#include <errno.h>
+#include <fcntl.h> /* for nonblocking */
+#include <sys/ioctl.h>
+#include <netdb.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/stat.h> /* for S_xxx file mode constants */
+#include <sys/uio.h> /* for iovec{} and readv/writev */
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/time.h> /* timeval{} for select() */
+
+#define VERSION "3.1.0"
+
+/* logic values */
+#define FALSE 0
+#define TRUE 1
+#define FAIL (FALSE)
+#define SUCCESS (TRUE)
+#define GOOD_EXIT 0
+#define BAD_EXIT -1
+
+/* Ports and addresses */
+#define PORT 7900 /* for multicast */
+#define FLOW_PORT (PORT-1) /* for flow-control */
+#define MCAST_ADDR "239.255.67.200"
+#define MCAST_TTL 1
+#define MCAST_LOOP FALSE
+#define MCAST_IF NULL
+
+#define REMOTE_SHELL "rsh"
+
+#define NO_FEEDBACK_COUNT_MAX 5
+#define USEC_TO_IDLE 1000000
+
+/* Speed stuff */
+#define FAST 100 /* usec */
+#define DT_PERPAGE 8000 /* usec time interval between pages */
+#define FACTOR 50
+
+/* time for the master to wait for the acknowledgement */
+#define ACK_WAIT_PERIOD 1 /* secs (from time()); */
+#define ACK_WAIT_TIMES 60 /* wait for this many periods */
+
+/* complaints */
+#define TOO_FAST 100
+#define SEND_AGAIN 200
+#define START_OK 300
+#define MISSING_PAGE 500
+#define LAST_MISSING 600
+#define EOF_OK 700
+#define PAGE_RECV 800
+
+#define FLOW_BUFFSIZE (2 * sizeof(int))
+
+#define PAGE_SIZE 64512 /* max page_size allowed */
+#define HEAD_SIZE (3 * sizeof(int))
+#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE)
+#define TOTAL_REC_PAGE 20 /* 31 20 */
+
+/* Modes */
+#define TIMED_OUT 0
+#define TEST 1
+#define SENDING_DATA 2
+#define RESENDING_DATA 3
+#define START_CMD 4
+#define EOF_CMD 5
+#define ALL_DONE_CMD 6
+#define NULL_CMD 7
+
+/* machine status */
+#define MACHINE_OK '\1'
+#define NOT_READY '\0'
+
+/* PAGE STATUS */
+#define MISSING '\0'
+#define RECEIVED '\1'
+
+/* MACHINE STATE */
+#define IDLE_STATE 0
+#define GET_DATA_STATE 1
+#define DATA_READY_STATE 2
+
+#define MAX_PAGE_SIZE 64512
+
+int verbose;
+
+#include "rttproto.h"
+
+#endif
diff --git a/rttmissings.c b/rttmissings.c
new file mode 100644
index 0000000..774e8b2
--- /dev/null
+++ b/rttmissings.c
@@ -0,0 +1,93 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+
+char * missingPages = NULL; /* array of flags */
+int nPages;
+
+int init_missingPages(int n)
+{
+ int i;
+
+ nPages = n;
+
+ /* init missingPages flags */
+ if (missingPages != NULL) free(missingPages); /* for second round */
+ missingPages = malloc(sizeof(char) * nPages);
+ for(i=0; i < nPages; ++i)
+ missingPages[i] = MISSING;
+
+ return 0;
+}
+
+int get_total_pages()
+{
+ return nPages;
+}
+
+int missing_pages()
+{
+ int result;
+ int i;
+
+ result = 0;
+
+ for(i=0; i < nPages; ++i)
+ if ((missingPages[i]) == MISSING) ++result;
+ return result;
+}
+
+int is_missing(int page)
+{
+ return (missingPages[page] == MISSING) ? 1 : 0;
+}
+
+void page_received(int page)
+{
+ missingPages[page] = RECEIVED;
+}
+
+int ask_for_missing_page()
+{
+ int i;
+ int n, count;
+
+ n = missing_pages();
+ if (n == 0) {
+ /* send_complaint(EOF_OK, machineID, 0); */
+ return 0; /* nothing is missing */
+ }
+
+ count = 0;
+ for(i=0; i < nPages; ++i) {
+ if ( missingPages[i] == MISSING ) {
+ ++count;
+ send_complaint((count==n) ? LAST_MISSING : MISSING_PAGE, i+1);
+ usleep(DT_PERPAGE);
+ }
+ }
+
+ return 1; /* there is something missing */
+}
+
diff --git a/rttpage_reader.c b/rttpage_reader.c
new file mode 100644
index 0000000..35940e5
--- /dev/null
+++ b/rttpage_reader.c
@@ -0,0 +1,188 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Codes in this file are extraced and modified from page_reader.c
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+
+/* the following is needed on Sun but not on linux */
+#ifdef _SUN
+#include <sys/filio.h>
+#endif
+
+extern char * my_MCAST_ADDR; /* defined in rttcatcher.c */
+extern int my_PORT;
+extern char * my_IFname;
+int machineState;
+int isFirstPage = TRUE;
+
+int recfd;
+#ifndef IPV6
+struct sockaddr_in rec_addr;
+#else
+struct sockaddr_in6 rec_addr;
+#endif
+
+int *mode_ptr; /* change from char to int */
+int *total_bytes_ptr;
+int *current_page_ptr;
+char *data_ptr;
+char rec_buf[PAGE_BUFFSIZE];
+
+void init_page_reader()
+{
+ struct ip_mreq mreq;
+ int rcv_size;
+
+ machineState = IDLE_STATE;
+
+ /* Prepare buffer */
+ mode_ptr = (int*)rec_buf; /* hp: add the cast (int *) */
+ current_page_ptr = (int *)(mode_ptr + 1);
+ total_bytes_ptr = (int *)(current_page_ptr + 1);
+ data_ptr = (char *)(total_bytes_ptr + 1);
+
+ /* Set up receive socket */
+ if (verbose) fprintf(stderr, "setting up receive socket\n");
+ recfd = rec_socket(&rec_addr, my_PORT);
+
+ /* Join the multicast group */
+ if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) {
+ perror("Joining Multicast Group");
+ }
+
+ /* Increase socket receive buffer */
+ rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE;
+ if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){
+ perror("Expanding receive buffer");
+ }
+}
+
+
+/*
+ This is the heart of catcher.
+ It parses the incoming pages and do proper reactions according
+ to the mode (command code) encoded in the first 4 bytes in an UDP page.
+ It returns the mode.
+
+ Note: since rtt is intended to deal with one-to-one machine,
+ the four-state engine as in page_reader is not used.
+*/
+int read_handle_page()
+{
+ #ifndef IPV6
+ struct sockaddr_in return_addr;
+ #else
+ struct sockaddr_in6 return_addr;
+ #endif
+
+ int bytes_read;
+ socklen_t return_len = (socklen_t)sizeof(return_addr);
+ int mode_v, total_bytes_v, current_page_v;
+
+ /* -----------receiving data -----------------*/
+ if (readable(recfd) == 1) { /* there is data coming in */
+ /* get data */
+ bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0,
+ (struct sockaddr *)&return_addr,
+ (socklen_t*) &return_len);
+
+ total_bytes_v = ntohl(*total_bytes_ptr);
+ if (bytes_read != total_bytes_v) return NULL_CMD;
+
+ /* convert from network byte order to host byte order */
+ mode_v = ntohl(*mode_ptr);
+ current_page_v = ntohl(*current_page_ptr);
+
+ if (isFirstPage) {
+ update_complaint_address(&return_addr);
+ isFirstPage = FALSE;
+ }
+
+ /* get init wish list and return address first time only
+ if (firstTime){
+ if (verbose)
+ fprintf(stderr, "Initializing complaint_sender and wish_list\n");
+ init_complaint_sender(&return_addr);
+ firstTime = 0;
+ }
+ */
+
+ /* --- process various commands */
+ switch (mode_v) {
+ case TEST:
+ /* It is just a test packet? */
+ fprintf(stderr, "********** Received test packet **********\n");
+ return mode_v;
+
+ case START_CMD:
+ if (verbose)
+ fprintf(stderr, "Start cmd received ---\n");
+ init_missingPages(current_page_v); /* Here: use current_page for total_pages */
+ send_complaint(START_OK, 0);
+ return mode_v;
+
+ case EOF_CMD:
+ if (verbose)
+ fprintf(stderr, "Check and ask for missing pages ---\n");
+
+ if (ask_for_missing_page()==0) {
+ /*
+ There is no missing page.
+ */
+ send_complaint(EOF_OK, 0);
+ }
+ /*
+ else
+ There are missing pages.
+ Ack has been done in ask_for_missing_page()
+ */
+
+ return mode_v;
+
+ case SENDING_DATA:
+ case RESENDING_DATA:
+ if (verbose){
+ fprintf(stderr, "Got %d bytes from page %d of %d, mode=%d\n",
+ bytes_read - HEAD_SIZE,
+ current_page_v, get_total_pages(), mode_v);
+ }
+
+ /* mode == SENDING_DATA, RESENDING_DATA */
+ page_received(current_page_v);
+ send_complaint(PAGE_RECV, 0);
+
+ /* Yes, we read a page */
+ return mode_v;
+ case ALL_DONE_CMD: /* this is presumably a good machine */
+ default:
+ return mode_v;
+ } /* end of switch */
+ } else {
+ /* No, the read timed out */
+ return TIMED_OUT;
+ }
+}
+
+
diff --git a/rttproto.h b/rttproto.h
new file mode 100644
index 0000000..6d4cf62
--- /dev/null
+++ b/rttproto.h
@@ -0,0 +1,116 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef __rttproto_h
+#define __rttproto_h
+
+/* rttsends */
+void init_sends(int n);
+void set_mode(int new_mode);
+int send_page(int page);
+void send_cmd(int code, int machine_id);
+void send_all_done_cmd();
+
+/* rttcomplaints */
+void init_complaints();
+int read_handle_complaint();
+void wait_for_ok(int code);
+void refresh_machine_status();
+int is_it_missing(int page);
+int has_missing_pages();
+void init_missing_page_flag(int n);
+void free_missing_page_flag();
+void refresh_machine_status();
+int get_total_missing_pages();
+void page_sent(int page);
+void pr_missing_pages();
+void do_cntl_c(int signo);
+void send_done_and_pr_msgs();
+void do_badMachines_exit(char * machine, int pid);
+
+/* setup_socket.c */
+void set_delay(int secs, int usecs);
+int readable(int fd);
+#ifndef IPV6
+int complaint_socket(struct sockaddr_in *addr, int port);
+int send_socket(struct sockaddr_in *addr, char * cp, int port);
+int rec_socket(struct sockaddr_in *addr, int port);
+#else
+int rec_socket(struct sockaddr_in6 *addr, int port);
+int send_socket(struct sockaddr_in6 *addr, char * cp, int port);
+int complaint_socket(struct sockaddr_in6 *addr, int port);
+#endif
+
+/* set_mcast.c */
+int mcast_set_if(int sockfd, const char *ifname, u_int ifindex);
+int mcast_set_loop(int sockfd, int onoff);
+int mcast_set_ttl(int sockfd, int val);
+
+/* set_catcher_mcast.c */
+int Mcast_join(int sockfd, const char *mcast_addr,
+ const char *ifname, u_int ifindex);
+void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr);
+
+/* rttcomplaint_sender */
+void send_complaint(int complaint, int page);
+void init_complaint_sender();
+#ifndef IPV6
+void update_complaint_address(struct sockaddr_in *sa);
+#else
+void update_complaint_address(struct sockaddr_in6 *sa);
+#endif
+
+/* rttpage_reader */
+void init_page_reader();
+int read_handle_page();
+
+/* rttmissings */
+int init_missingPages(int n);
+int missing_pages();
+int is_missing(int page);
+void page_received(int page);
+int ask_for_missing_page();
+int get_total_pages();
+
+/* timing */
+void refresh_timer();
+double get_accumulated_time();
+void start_timer();
+void end_timer();
+void update_time_accumulator();
+double get_accumulated_usec();
+void update_rtt_hist(unsigned int rtt);
+void pr_rtt_hist();
+void init_rtt_hist();
+
+/* signal.c */
+typedef void Sigfunc(int); /* for signal handlers */
+Sigfunc * Signal(int signo, Sigfunc *func);
+int Fcntl(int fd, int cmd, int arg);
+int Ioctl(int fd, int request, void *arg);
+void Sigemptyset(sigset_t *set);
+void Sigaddset(sigset_t *set, int signo);
+void Sigprocmask(int how, const sigset_t *set, sigset_t *oset);
+
+#endif
diff --git a/rttsends.c b/rttsends.c
new file mode 100644
index 0000000..7038e2c
--- /dev/null
+++ b/rttsends.c
@@ -0,0 +1,144 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ codes in this file are extracted and modified from sends.c
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "rttmain.h"
+#include <net/if.h>
+#ifdef _SUN
+#include <sys/sockio.h> /* define SIOCGIFADDR */
+#else
+#include <linux/sockios.h>
+#endif
+
+extern char * my_MCAST_ADDR;
+extern int my_PORT;
+extern int my_TTL;
+extern int my_LOOP;
+extern char * my_IFname;
+
+/* buffer for sending (same structure as those in sends.c */
+int *mode_ptr; /* char type would cause alignment problem on Sparc */
+int *current_page_ptr;
+int *total_bytes_ptr;
+char *data_ptr;
+char send_buff[PAGE_BUFFSIZE];
+
+int pageSize;
+
+/* Send socket */
+int send_fd;
+#ifndef IPV6
+struct sockaddr_in send_addr;
+#else
+struct sockaddr_in6 send_addr;
+#endif
+
+/*
+ set_mode sets the caster into a new mode.
+ modes are defined in main.h:
+*/
+void set_mode(int new_mode)
+{
+ *mode_ptr = htonl(new_mode);
+}
+
+
+/* init_sends initializes the send buffer */
+void init_sends(int npagesize)
+{
+ pageSize = (npagesize>PAGE_SIZE) ? PAGE_SIZE : npagesize;
+
+ mode_ptr = (int *)send_buff; /* hp: add (int*) */
+ current_page_ptr = (int *) (mode_ptr + 1);
+ total_bytes_ptr = (int *)(current_page_ptr + 1);
+ data_ptr = (char *)(total_bytes_ptr + 1);
+
+ send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT);
+
+ /******* change MULTICAST_IF ********/
+ if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0)
+ perror("init_sends(): when set MULTICAST_IF\n");
+
+ /* set multicast_ttl such that UDP can go to 2nd subnetwork */
+ if (mcast_set_ttl(send_fd, my_TTL) < 0)
+ perror("init_sends(): when set MULTICAST_TTL\n");
+
+ /* disable multicast_loop such that there is no echo back on master */
+ if (mcast_set_loop(send_fd, my_LOOP) < 0)
+ perror("init_sends(): when set MULTICAST_LOOP\n");
+
+ /* put dummy contents into send (UDP) buffer */
+ memset(data_ptr, 1, PAGE_SIZE);
+}
+
+/*
+ send_buffer will send the buffer with the file information
+ out to the socket connection with the catcher.
+*/
+int send_buffer(int bytes_read)
+{
+ /* Else send the data */
+ if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE,
+ 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) {
+ perror("Sending packet");
+ exit(1);
+ }
+ return (1);
+}
+
+/*
+ send_page takes a page from the current file and controls
+ sending it out the socket to the catcher. It calls send_buffer
+ to do the actuall call to sendto.
+*/
+int send_page(int page)
+{
+ if (verbose>=2) fprintf(stderr, "in send_page\n");
+ *total_bytes_ptr = htonl(pageSize+HEAD_SIZE);
+ *current_page_ptr = htonl(page);
+
+ return send_buffer(pageSize);
+}
+
+
+void send_cmd(int code, int pages)
+{
+ *mode_ptr = htonl(code);
+ *current_page_ptr = htonl(pages);
+ *total_bytes_ptr = htonl(HEAD_SIZE);
+
+ send_buffer(0);
+}
+
+
+void send_all_done_cmd()
+{
+ *mode_ptr = htonl(ALL_DONE_CMD);
+ *current_page_ptr = 0;
+ *total_bytes_ptr = htonl(HEAD_SIZE) ;
+
+ send_buffer(0);
+ if (verbose) fprintf(stderr, "(ALL DONE)\n");
+}
diff --git a/sends.c b/sends.c
new file mode 100644
index 0000000..61aa07a
--- /dev/null
+++ b/sends.c
@@ -0,0 +1,329 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Following the suggestion by Robert Dack <robd@kelman.com>,
+ I added the option to change the default IP address for multicasting
+ and the PORT for flow control. See mrsync.py for the new options.
+
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <net/if.h>
+#ifdef _SUN
+#include <sys/sockio.h> /* define SIOCGIFADDR */
+#else
+#include <linux/sockios.h>
+#endif
+
+extern char * my_MCAST_ADDR; /* defined in multicaster.c */
+extern int my_PORT; /* ditto */
+extern int my_TTL;
+extern int my_LOOP;
+extern char * my_IFname; /* defined in multicaster.c */
+extern int verbose;
+
+extern unsigned int nPages;
+extern unsigned int last_bytes; /* the number of bytes in the last page of a file */
+extern int cur_entry;
+extern int file_changed;
+extern char* cmd_name[];
+
+/* Where have we last sent from? */
+int most_recent_file;
+int most_recent_fd;
+
+/*
+ Send buffer for storing the data and to be transmitted thru UDP
+ The format:
+ (5*sizeof(int) bytes header) + (PAGE_SIZE data area)
+
+ The header has five int_type (4 bytes) int's.
+ (1) mode -- for master to give instructions to the target machines.
+ (2) current file index (starting with 1)
+ (3) current page index (starting with 1) -- gee!
+ (4) bytes to be sent in this page via UPD
+ (5) total number of pages for this file
+
+ data_ptr points to the data area.
+*/
+int *mode_ptr; /* char type would cause alignment problem on Sparc */
+int *current_file_ptr;
+int *current_page_ptr;
+int *bytes_sent_ptr;
+int *total_pages_ptr;
+char *data_ptr;
+char *fill_here;
+char send_buff[PAGE_BUFFSIZE];
+
+/* Send socket */
+int send_fd;
+#ifndef IPV6
+struct sockaddr_in send_addr;
+#else
+struct sockaddr_in6 send_addr;
+#endif
+
+/* for final statistics */
+extern unsigned int total_pages;
+extern off_t total_bytes;
+off_t real_total_bytes;
+unsigned int real_total_pages;
+
+/*
+ set_mode sets the caster into a new mode.
+ modes are defined in main.h:
+*/
+void set_mode(int new_mode)
+{
+ /* 20060323 convert it to network byte order */
+ *mode_ptr = htonl(new_mode);
+}
+
+/* init_sends initializes the send buffer */
+void init_sends()
+{
+ most_recent_file = most_recent_fd = -99999;
+ real_total_bytes = 0;
+ real_total_pages = 0;
+
+ /* pointers for send buffer */
+ mode_ptr = (int *)send_buff;
+ current_file_ptr = (int *)(mode_ptr+1);
+ current_page_ptr = (int *)(current_file_ptr + 1);
+ bytes_sent_ptr = (int *)(current_page_ptr + 1);
+ total_pages_ptr = (int *)(bytes_sent_ptr + 1);
+ data_ptr = (char *)(total_pages_ptr + 1);
+ fill_here = data_ptr;
+
+ /* send socket */
+ send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT);
+
+ /******* change MULTICAST_IF ********/
+ if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0)
+ perror("init_sends(): when set MULTICAST_IF\n");
+
+ /* set multicast_ttl such that UDP can go to 2nd subnetwork */
+ if (mcast_set_ttl(send_fd, my_TTL) < 0)
+ perror("init_sends(): when set MULTICAST_TTL\n");
+
+ /* disable multicast_loop such that there is no echo back on master */
+ if (mcast_set_loop(send_fd, my_LOOP) < 0)
+ perror("init_sends(): when set MULTICAST_LOOP\n");
+}
+
+void clear_send_buf()
+{
+ fill_here = data_ptr;
+}
+
+/*
+ put file contents into send (UDP) buffer
+ return the number of bytes put into the buffer.
+*/
+ssize_t pack_page_for_file(int page)
+{
+ if(verbose>=2)
+ fprintf(stderr, "Sending page %d of file %d\n", page, current_entry());
+
+ /***
+ Adjust the position for reading
+ NOTE:if the type (off_t) is not given, the large file operation
+ would fail.
+ ***/
+ lseek(most_recent_fd, (off_t)PAGE_SIZE * (off_t)(page - 1), SEEK_SET);
+
+ /* read it and put the content into send_buff */
+ /* the max number this read() will return is PAGE_SIZE */
+ return read(most_recent_fd, data_ptr, PAGE_SIZE);
+}
+
+int fexist(int entry)
+{
+ if (entry != most_recent_file) {
+ if (most_recent_fd > 0) close(most_recent_fd); /* make sure we close it */
+
+ if((most_recent_fd = open(getFullname(), O_RDONLY, 0)) < 0){
+ perror(getFullname());
+ }
+ most_recent_file = entry;
+ }
+ return (most_recent_fd >= 0); /* <0 means FAIL */
+}
+
+
+/*
+ send_buffer will send the buffer with the file information
+ out to the socket connection with the catcher.
+ return 0 -- ok, -1 sent failed.
+*/
+int send_buffer(int bytes_read)
+{
+ /* send the data */
+ if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE,
+ 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) {
+ perror("Sending packet");
+ return FAIL;
+ }
+ return SUCCESS;
+}
+
+/*
+ send_page takes a page from the current file and controls
+ sending it out the socket to the catcher. It calls send_buffer
+ to do the actuall call to sendto.
+*/
+int send_page(int page)
+{
+ unsigned int bytes;
+
+ if (verbose>=2) fprintf(stderr, "In send_page()\n");
+
+ if (file_changed) {
+ total_bytes -= ((page<nPages) ? PAGE_SIZE : last_bytes);
+ --total_pages;
+ return FAIL;
+ }
+
+ bytes = (unsigned int) pack_page_for_file(page);
+
+ if ((page < nPages && bytes != PAGE_SIZE) ||
+ (nPages == page && bytes != last_bytes)) {
+ /* file under sync must have been changed during syncing */
+ fprintf(stderr, "Warning: read() error, expected_bytes= %d, real= %d, "
+ "page = %d, nPages = %s%d, for file %s\n",
+ (page<nPages) ? PAGE_SIZE : last_bytes,
+ bytes, page, (current_entry()<0) ? "-" : "", nPages, getFullname());
+ file_changed = TRUE;
+ /* make corrections in total_xxx that we send
+ otherwise the final statistic will be messed up */
+ total_bytes -= ((page<nPages) ? PAGE_SIZE : last_bytes);
+ --total_pages;
+ return FAIL;
+ }
+
+ /* fill in the header data */
+ /* 20060323 -- add htonl so that the codes can work across
+ different machines with either little- or big-endian.
+ So, before we send out ints, we convert them to network-byte order*/
+ *total_pages_ptr = htonl(nPages);
+ *bytes_sent_ptr = htonl(bytes);
+ *current_file_ptr = htonl(cur_entry);
+ *current_page_ptr = htonl(page);
+
+ /* for final statistics */
+ ++real_total_pages;
+ real_total_bytes += bytes;
+
+ if (verbose>=3)
+ fprintf(stderr, "Sending page=%d of %d in file %d of %d\n",
+ page, nPages,
+ cur_entry, total_entries());
+ return send_buffer(bytes);
+}
+
+/*
+ send_test zeroes out the buffers going to the catcher
+ and thereby sends a test packet to the catcher.
+*/
+void send_test()
+{
+ *mode_ptr = htonl(TEST); /* --> network byte order */
+
+ *current_file_ptr = 0;
+ *current_page_ptr = 0;
+ *total_pages_ptr = 0;
+ *bytes_sent_ptr = 0;
+
+ send_buffer(0);
+ fprintf(stderr, "Test packet is sent.\n");
+}
+
+void send_cmd(int code, int machine_id)
+{
+ /*
+ Except for OPEN_FILE_CMD,
+ only the header area in the send_buff gets filled
+ with data.
+ */
+ *mode_ptr = htonl(code); /* --> network byte order */
+ *current_page_ptr = htonl(machine_id); /* -1 being all machines */
+ *current_file_ptr = htonl(cur_entry);
+ *total_pages_ptr = htonl(nPages);
+
+ *bytes_sent_ptr = (code == OPEN_FILE_CMD) ? htonl(fill_here - data_ptr) :0;
+
+ /* do the header in send_buf */
+ send_buffer((code == OPEN_FILE_CMD) ? fill_here - data_ptr : 0);
+
+ /* print message */
+ if (verbose>=2) {
+ fprintf(stderr, "cmd [%s] sent\n", cmd_name[code]);
+ }
+}
+
+void pack_open_file_info()
+{
+ /* prepare udp send_buffer for file info
+ (header) (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0
+ */
+ clear_send_buf();
+ fill_here += fill_in_stat(data_ptr);
+ fill_here += fill_in_filename(fill_here);
+ if (is_softlink() || is_hardlink()) {
+ fill_here += fill_in_linktar(fill_here);
+ }
+}
+
+void send_all_done_cmd()
+{
+ int i;
+ *mode_ptr = htonl(ALL_DONE_CMD); /* --> network byte order */
+
+ *current_file_ptr = 0;
+ *current_page_ptr = 0;
+ *total_pages_ptr = 0;
+ *bytes_sent_ptr = 0;
+
+ for(i=0; i<10; ++i) { /* do it many times, in case network is busy */
+ send_buffer(0);
+ usleep(DT_PERPAGE*10);
+ }
+ /* NOTE: it is still possible that ALL_DONE msg could not
+ be received by targets.
+ For total robustness, some independent checking on targets
+ should be done.
+ */
+ fprintf(stderr, "(ALL DONE)\n");
+}
+
+void my_exit(int good_or_bad)
+{
+ if (send_fd) send_all_done_cmd();
+ exit(good_or_bad);
+}
diff --git a/set_catcher_mcast.c b/set_catcher_mcast.c
new file mode 100644
index 0000000..9a04017
--- /dev/null
+++ b/set_catcher_mcast.c
@@ -0,0 +1,142 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This file collects functions related to setting multicast
+ for multicatcher. They are IPv4 and IPv6 ready.
+ By default, we use IPv4.
+ To use IPv6, we need to specify -DIPv6 in Makefile.
+ The functions in this file are collected from
+ Richard Stevens' Networking bible: Unix Network programming
+
+ I added Mcast_join().
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <net/if.h>
+#ifdef _SUN
+#include <sys/sockio.h> /* define SIOCGIFADDR */
+#else
+#include <linux/sockios.h>
+#endif
+
+#define SA struct sockaddr
+
+int mcast_join(int sockfd, const SA *sa, socklen_t salen,
+ const char *ifname, u_int ifindex)
+{
+ switch (sa->sa_family) {
+ case AF_INET: {
+ struct ip_mreq mreq;
+ struct ifreq ifreq;
+
+ memcpy(&mreq.imr_multiaddr,
+ &((struct sockaddr_in *) sa)->sin_addr,
+ sizeof(struct in_addr));
+
+ if (ifindex > 0) {
+ if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) {
+ errno = ENXIO; /* i/f index not found */
+ return(-1);
+ }
+ goto doioctl;
+ } else if (ifname != NULL) {
+ strncpy(ifreq.ifr_name, ifname, IFNAMSIZ);
+doioctl:
+ if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0)
+ return(-1);
+ memcpy(&mreq.imr_interface,
+ &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr,
+ sizeof(struct in_addr));
+ } else
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+
+ return(setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+ &mreq, sizeof(mreq)));
+ }
+
+#ifdef IPV6
+ case AF_INET6: {
+ struct ipv6_mreq mreq6;
+
+ memcpy(&mreq6.ipv6mr_multiaddr,
+ &((struct sockaddr_in6 *) sa)->sin6_addr,
+ sizeof(struct in6_addr));
+
+ if (ifindex > 0)
+ mreq6.ipv6mr_interface = ifindex;
+ else if (ifname != NULL)
+ if ( (mreq6.ipv6mr_interface = if_nametoindex(ifname)) == 0) {
+ errno = ENXIO; /* i/f name not found */
+ return(-1);
+ }
+ else
+ mreq6.ipv6mr_interface = 0;
+
+ return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
+ &mreq6, sizeof(mreq6)));
+ }
+#endif
+
+ default:
+ errno = EPROTONOSUPPORT;
+ return(-1);
+ }
+}
+
+int Mcast_join(int sockfd, const char *mcast_addr,
+ const char *ifname, u_int ifindex)
+{
+ #ifndef IPV6
+ /* IPv4 */
+ struct sockaddr_in sa;
+ sa.sin_family = AF_INET;
+ inet_pton(AF_INET, mcast_addr, &sa.sin_addr);
+ #else
+ struct sockaddr_in6 sa;
+ sa.sin6_family = AF_INET6;
+ inet_pton(AF_INET6, mcast_addr, &sa.sin6_addr);
+ #endif
+
+ return (mcast_join(sockfd, (struct sockaddr *) &sa, sizeof(sa),
+ ifname, ifindex));
+}
+
+void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr)
+{
+ switch (sa->sa_family) {
+ case AF_INET: {
+ struct sockaddr_in *sin = (struct sockaddr_in *) sa;
+
+ memcpy(&sin->sin_addr, addr, sizeof(struct in_addr));
+ return;
+ }
+
+ #ifdef IPV6
+ case AF_INET6: {
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) sa;
+
+ memcpy(&sin6->sin6_addr, addr, sizeof(struct in6_addr));
+ return;
+ }
+ #endif
+ }
+
+ return;
+}
+
diff --git a/set_mcast.c b/set_mcast.c
new file mode 100644
index 0000000..aac3d36
--- /dev/null
+++ b/set_mcast.c
@@ -0,0 +1,160 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This file collects functions related to setting multicast
+ for multicaster. They are IPv4 and IPv6 ready.
+ By default, we use IPv4.
+ To use IPv6, we need to specify -DIPv6 in Makefile.
+ The functions in this file are collected from
+ Richard Stevens' Networking bible: Unix Network programming
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "main.h"
+#include <net/if.h>
+#ifdef _SUN
+#include <sys/sockio.h> /* define SIOCGIFADDR */
+#else
+#include <linux/sockios.h>
+#endif
+
+#define SA struct sockaddr
+#define MAXSOCKADDR 128 /* max socket address structure size */
+
+int sockfd_to_family(int sockfd)
+{
+ union {
+ struct sockaddr sa;
+ char data[MAXSOCKADDR];
+ } un;
+ socklen_t len;
+
+ len = MAXSOCKADDR;
+ if (getsockname(sockfd, (SA *) un.data, &len) < 0)
+ return(-1);
+ return(un.sa.sa_family);
+}
+
+int mcast_set_if(int sockfd, const char *ifname, u_int ifindex)
+{
+ switch (sockfd_to_family(sockfd)) {
+ case AF_INET: {
+ struct in_addr inaddr;
+ struct ifreq ifreq;
+
+ if (ifindex > 0) {
+ if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) {
+ errno = ENXIO; /* i/f index not found */
+ return(-1);
+ }
+ goto doioctl;
+ } else if (ifname != NULL) {
+ strncpy(ifreq.ifr_name, ifname, IFNAMSIZ);
+doioctl:
+ if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0)
+ return(-1);
+ memcpy(&inaddr,
+ &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr,
+ sizeof(struct in_addr));
+ } else
+ inaddr.s_addr = htonl(INADDR_ANY); /* remove prev. set default */
+
+ return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF,
+ &inaddr, sizeof(struct in_addr)));
+ }
+
+#ifdef IPV6
+ case AF_INET6: {
+ u_int index;
+
+ if ( (index = ifindex) == 0) {
+ if (ifname == NULL) {
+ errno = EINVAL; /* must supply either index or name */
+ return(-1);
+ }
+ if ( (index = if_nametoindex(ifname)) == 0) {
+ errno = ENXIO; /* i/f name not found */
+ return(-1);
+ }
+ }
+ return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_IF,
+ &index, sizeof(index)));
+ }
+#endif
+
+ default:
+ errno = EPROTONOSUPPORT;
+ return(-1);
+ }
+}
+
+int mcast_set_loop(int sockfd, int onoff)
+{
+ switch (sockfd_to_family(sockfd)) {
+ case AF_INET: {
+ u_char flag;
+
+ flag = onoff;
+ return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_LOOP,
+ &flag, sizeof(flag)));
+ }
+
+#ifdef IPV6
+ case AF_INET6: {
+ u_int flag;
+
+ flag = onoff;
+ return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
+ &flag, sizeof(flag)));
+ }
+#endif
+
+ default:
+ errno = EPROTONOSUPPORT;
+ return(-1);
+ }
+}
+/* end mcast_set_loop */
+
+int mcast_set_ttl(int sockfd, int val)
+{
+ switch (sockfd_to_family(sockfd)) {
+ case AF_INET: {
+ u_char ttl;
+
+ ttl = val;
+ return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL,
+ &ttl, sizeof(ttl)));
+ }
+
+#ifdef IPV6
+ case AF_INET6: {
+ int hop;
+
+ hop = val;
+ return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
+ &hop, sizeof(hop)));
+ }
+#endif
+
+ default:
+ errno = EPROTONOSUPPORT;
+ return(-1);
+ }
+}
+
diff --git a/setup_socket.c b/setup_socket.c
new file mode 100644
index 0000000..3625a2a
--- /dev/null
+++ b/setup_socket.c
@@ -0,0 +1,242 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ make it IPv6-ready
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ file name is changed from main.c to setup_socket.c
+ Copyright (C) 2001 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ This file was modified in 2001 and later from files in the program
+ multicaster copyrighted by Aaron Hillegass as found at
+ <http://sourceforge.net/projects/multicaster/>
+
+ Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+/*
+ This part was based on
+ (1) codes by Aaron Hillegass <aaron@classmax.com>
+ (2) codes in Steven's book 'network programming'
+
+ 200605 change it to make it IPv6 ready
+
+*/
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
+#include <arpa/inet.h> /* inet(3) functions */
+#include <errno.h>
+
+extern int verbose;
+int delay_sec;
+int delay_usec;
+
+
+/* Set up for catcher's complaint socket */
+#ifndef IPV6
+int complaint_socket(struct sockaddr_in *addr, int port)
+#else
+int complaint_socket(struct sockaddr_in6 *addr, int port)
+#endif
+{
+ int fd, sockaddr_len;
+ sa_family_t family;
+ if (verbose>=2) fprintf(stderr, "in send_socket_ip\n");
+
+ #ifndef IPV6
+ /* IPv4 */
+ sockaddr_len = sizeof(struct sockaddr_in);
+ memset(addr, 0, sockaddr_len);
+ addr->sin_family = AF_INET;
+ family = AF_INET;
+ addr->sin_port = htons(port);
+ /*addr->sin_addr.s_addr = ip; this fx in for init process only
+ This ip will be overwritten later after
+ 1st packet is received. */
+ #else
+ /* IPv6 */
+ sockaddr_len = sizeof(struct sockaddr_in6);
+ memset(addr, 0, sockaddr_len);
+ addr->sin6_family = AF_INET6;
+ family = AF_INET6;
+ addr->sin6_port = htons(port);
+ /* addr->sin_addr.s_addr = ip; see comments above */
+ #endif
+
+ if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){
+ perror("Send socket");
+ exit(1);
+ }
+ return fd;
+}
+
+/* Set up mcast send socket for multicaster based on (char*)cp */
+#ifndef IPV6
+int send_socket(struct sockaddr_in *addr, char * cp, int port)
+#else
+int send_socket(struct sockaddr_in6 *addr, char * cp, int port)
+#endif
+{
+ int fd, sockaddr_len;
+ sa_family_t family;
+ char buf[50];
+ if (verbose>=2) fprintf(stderr, "in send_socket\n");
+
+ #ifndef IPV6
+ /* IPv4 */
+ sockaddr_len = sizeof(struct sockaddr_in);
+ memset(addr, 0, sockaddr_len);
+ addr->sin_family = AF_INET;
+ family = AF_INET;
+ addr->sin_port = htons(port);
+ /*addr->sin_addr.s_addr = inet_addr(cp);*/
+ inet_pton(AF_INET, cp, &addr->sin_addr);
+ /* Print out IP address and port */
+ inet_ntop(AF_INET, &addr->sin_addr, buf, 50);
+ #else
+ sockaddr_len = sizeof(struct sockaddr_in6);
+ memset(addr, 0, sockaddr_len);
+ addr->sin6_family = AF_INET6;
+ family = AF_INET6;
+ addr->sin6_port = htons(port);
+ /*addr->sin_addr.s_addr = inet_addr(cp);*/
+ inet_pton(AF_INET6, cp, &addr->sin6_addr);
+ /* Print out IP address and port */
+ inet_ntop(AF_INET6, &addr->sin6_addr, buf, 50);
+ #endif
+
+ if (verbose>=2)
+ fprintf(stderr, "Creating a send socket to %s:%d\n", buf, port);
+
+ if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){
+ perror("Send socket");
+ exit(1);
+ }
+
+ if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){
+ perror("in send_socket(): bind error (need to change MCAST_ADDR)");
+ exit(1);
+ }
+ return fd; /*send_socket_ip(addr, address, port);*/
+}
+
+/* set up socket on the receiving end */
+#ifndef IPV6
+int rec_socket(struct sockaddr_in *addr, int port)
+#else
+int rec_socket(struct sockaddr_in6 *addr, int port)
+#endif
+{
+ int fd, sockaddr_len;
+ sa_family_t family;
+
+ #ifndef IPV6
+ /* IPv4 */
+ sockaddr_len = sizeof(struct sockaddr_in);
+ memset(addr, 0, sockaddr_len);
+ addr->sin_family = AF_INET;
+ family = AF_INET;
+ addr->sin_port = htons(port);
+ addr->sin_addr.s_addr = htonl(INADDR_ANY);
+ #else
+ sockaddr_len = sizeof(struct sockaddr_in6);
+ memset(addr, 0, sockaddr_len);
+ addr->sin6_family = AF_INET6;
+ family = AF_INET6;
+ addr->sin6_port = htons(port);
+ addr->sin6_addr = in6addr_any; /* RS book: page 92 */
+ #endif
+
+ if((fd = socket(family, SOCK_DGRAM, 0)) < 0){
+ perror("Socket create");
+ exit(1);
+ }
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, NULL, 0);
+
+ if (verbose>=2)
+ fprintf(stderr, "Creating receive socket on port %d\n", port);
+
+ /*if ((bind(fd, addr, sizeof(*addr))) < 0){ MOD by RWM: replaced by next line */
+ if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){
+ perror("in rec_socket(): bind error (need to change PORT)");
+ exit(1);
+ }
+ return fd;
+}
+
+/*
+ set the values of two variables to be used by select()
+ in readable().
+*/
+void set_delay(int secs, int usecs)
+{
+ if (verbose>=2) {
+ if (usecs == -1)
+ fprintf(stderr, "Timeout: set to infinite\n");
+ else
+ fprintf(stderr, "Timeout: set to %d sec + %d usec\n", secs, usecs);
+ }
+ delay_sec = secs;
+ delay_usec = usecs;
+}
+
+void get_delay(int * secs, int * usecs)
+{
+ *secs = delay_sec;
+ *usecs= delay_usec;
+}
+
+/*
+ Check if there is an incoming UDP for a certain amount
+ of time period specified in delay_tv..
+
+ Return 'true' if there is.
+ Return 'false' if no valid UDP has arrived within the time period.
+
+ In multicaster, this is used in read_handle_page() right after
+ send_page() is carried out. As a result, read_handle_complaint() waits
+ for any incoming complaints from any target machines within
+ the specified time period. As it is now, no target machine
+ will send back complaints during the transmission of all the
+ pages for a file. Therefore, read_handle_complaint() serves effectively
+ as a time delay between sending of each page.
+*/
+int readable(int fd)
+{
+ struct timeval delay_tv;
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(fd, &rset);
+
+ /*
+ if microsec == -1 wait forever for a packet.
+ This is used in the beginning when multicatcher is just
+ invoked.
+ */
+ if (delay_usec == -1){
+ return(select(fd + 1, &rset, NULL, NULL, NULL));
+ } else {
+ delay_tv.tv_sec = delay_sec;
+ delay_tv.tv_usec = delay_usec;
+ return (select(fd + 1, &rset, NULL, NULL, &delay_tv));
+ }
+}
+
diff --git a/signal.c b/signal.c
new file mode 100644
index 0000000..5b49b52
--- /dev/null
+++ b/signal.c
@@ -0,0 +1,93 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ The code in this file is copied from
+ Richard Stevens' book
+ "UNIX Network Programming" Chap.22.3
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include "signal.h"
+
+Sigfunc * signal(int signo, Sigfunc *func)
+{
+ struct sigaction act, oact;
+
+ act.sa_handler = func;
+ sigemptyset(&act.sa_mask);
+ act.sa_flags = 0;
+ if (signo == SIGALRM) {
+#ifdef SA_INTERRUPT
+ act.sa_flags |= SA_INTERRUPT; /* SunOS 4.x */
+#endif
+ } else {
+#ifdef SA_RESTART
+ act.sa_flags |= SA_RESTART; /* SVR4, 44BSD */
+#endif
+ }
+ if (sigaction(signo, &act, &oact) < 0)
+ return(SIG_ERR);
+ return(oact.sa_handler);
+}
+/* end signal */
+
+Sigfunc * Signal(int signo, Sigfunc *func) /* for our signal() function */
+{
+ Sigfunc *sigfunc;
+
+ if ( (sigfunc = signal(signo, func)) == SIG_ERR)
+ perror("signal error");
+ return(sigfunc);
+}
+
+int Fcntl(int fd, int cmd, int arg)
+{
+ int n;
+
+ if ( (n = fcntl(fd, cmd, arg)) == -1)
+ perror("fcntl error");
+ return(n);
+}
+
+int Ioctl(int fd, int request, void *arg)
+{
+ int n;
+
+ if ( (n = ioctl(fd, request, arg)) == -1)
+ perror("ioctl error");
+ return(n); /* streamio of I_LIST returns value */
+}
+
+void Sigemptyset(sigset_t *set)
+{
+ if (sigemptyset(set) == -1)
+ perror("sigemptyset error");
+}
+
+void Sigaddset(sigset_t *set, int signo)
+{
+ if (sigaddset(set, signo) == -1)
+ perror("sigaddset error");
+}
+
+void Sigprocmask(int how, const sigset_t *set, sigset_t *oset)
+{
+ if (sigprocmask(how, set, oset) == -1)
+ perror("sigprocmask error");
+}
diff --git a/signal.h b/signal.h
new file mode 100644
index 0000000..bf309f8
--- /dev/null
+++ b/signal.h
@@ -0,0 +1,32 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include <sys/types.h>
+#include <stdio.h>
+#include <signal.h>
+#include <unistd.h>
+/******* on linux: ioctl() is defined in sys/ioctl.h instead of unistd.h as on SunOS *****/
+#include <sys/ioctl.h>
+#include <fcntl.h>
+
+typedef void Sigfunc(int); /* for signal handlers */
+
diff --git a/timing.c b/timing.c
new file mode 100644
index 0000000..d4baa15
--- /dev/null
+++ b/timing.c
@@ -0,0 +1,109 @@
+/*
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2005 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#include <sys/time.h>
+#include <stdio.h>
+
+#define N 501 /* effectively set the maximum time in rtt_hist to be 500 msec */
+
+extern int verbose;
+
+/* for timing */
+struct timeval tv0, tv1;
+unsigned long usec_acc, sec_acc; /* accumulator of timing */
+unsigned int rtt_hist[N]; /* rtt_hist[i] = count of rtt within (i, i+1) */
+
+void refresh_timer()
+{
+ usec_acc = 0;
+ sec_acc = 0;
+}
+
+void start_timer()
+{
+ struct timezone tz;
+ gettimeofday(&tv0, &tz);
+}
+
+void end_timer()
+{
+ struct timezone tz;
+ gettimeofday(&tv1, &tz); /* end timer -------- */
+}
+
+void update_time_accumulator()
+{
+ if (tv1.tv_usec<tv0.tv_usec) {
+ sec_acc += (tv1.tv_sec - tv0.tv_sec - 1);
+ usec_acc += (1000000 + tv1.tv_usec - tv0.tv_usec);
+ } else {
+ sec_acc += (tv1.tv_sec - tv0.tv_sec);
+ usec_acc += (tv1.tv_usec - tv0.tv_usec);
+ }
+}
+
+double get_accumulated_time()
+{
+ double sec = sec_acc;
+ sec += (usec_acc / 1e6);
+ return sec;
+}
+
+double get_accumulated_usec()
+{
+ double usec = usec_acc;
+ usec += (sec_acc*1e6);
+ return usec;
+}
+
+void init_rtt_hist()
+{
+ int i;
+ for(i=0; i<N; ++i) rtt_hist[i] = 0;
+}
+
+void update_rtt_hist(unsigned int rtt)
+{
+ unsigned int index;
+ index = rtt / 1000;
+ if (index>(N-2)) index = N-1;
+ rtt_hist[index]++;
+}
+
+void pr_rtt_hist()
+{
+ int i;
+ fprintf(stderr, "rtt histogram\n");
+ fprintf(stderr, "msec counts\n");
+ fprintf(stderr, "---- --------\n");
+ for(i=0; i<N; ++i) {
+ if (verbose<=1 && i>10) continue;
+ if (rtt_hist[i] != 0) {
+ fprintf(stderr, "%4d %u\n", i, rtt_hist[i]);
+ }
+ }
+}
+
+unsigned int pages_wo_ack()
+{
+ return rtt_hist[N-1];
+}
diff --git a/trFilelist.c b/trFilelist.c
new file mode 100644
index 0000000..8f65796
--- /dev/null
+++ b/trFilelist.c
@@ -0,0 +1,449 @@
+/*
+ Copyright (C) 2008 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+ Copyright (C) 2006 Renaissance Technologies Corp.
+ main developer: HP Wei <hp@rentec.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING.
+ If not, write to the Free Software Foundation,
+ 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+/***this code parse the synclist file generated by rsync in dry-run mode.
+ The minimum options for rsync is : -avW --dry-run --delete
+ e.g.
+ /usr/local/bin/rsync --rsync-path=/usr/local/bin/rsync
+ -avW --dry-run --delete
+ /src/path/ dest_machine:/target/path/ > output 2>&1
+A typical output of rsync may look like this:
+Client is very old version of rsync, upgrade recommended.
+building file list ... done
+xyz -> ./sub1/xyz
+file1
+fn
+fn1
+j
+sub1/hardlink_to_file1
+sub1/path/testfile
+sub1/xyz
+sent 337 bytes read 44 bytes 254.00 bytes/sec
+total size is 320751046 speedup is 841866.26
+
+This code does the following three things.
+(1) skip the lines before 'done' and after 'wrote'
+(2) output all directories and file_path
+ e.g
+ for an entry: sub1/path/testfile, the output is
+ sub1
+ sub1/path
+ sub1/path/testfile
+(3) xyz -> ./sub1/xyz
+ the output is
+ xyz
+(4) If file1 and sub1/hardlink are hardlinked
+ the output is
+ file1
+ sub1/hardlink file1
+
+For the example output above, the output of this code is:
+
+xyz
+file1
+fn
+fn1
+j
+sub1
+sub1/hardlink_to_file1 file1
+sub1/path
+sub1/path/testfile
+sub1/xyz
+
+***/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <string.h>
+#include <limits.h> /* to define PATH_MAX */
+#include <sys/stat.h>
+
+#define TRUE 1
+#define FALSE 0
+
+struct string_list {
+ int capacity;
+ char ** endp;
+ char ** str;
+};
+
+void init_string_list(struct string_list * str_ptr, int n)
+{
+ str_ptr->str = malloc(n * sizeof(void*));
+ str_ptr->capacity = n;
+ str_ptr->endp = str_ptr->str;
+}
+
+void grow_string_list(struct string_list * slp)
+{
+ int new_capacity = 2 * slp->capacity;
+ char ** old_ptrs = slp->str;
+ char ** new_ptrs;
+ char ** newp = malloc(new_capacity * sizeof(void *));
+ new_ptrs = newp;
+
+ while (old_ptrs < slp->endp) {
+ *new_ptrs++ = *old_ptrs++;
+ }
+
+ free(slp->str);
+ slp->str = newp;
+ slp->endp= new_ptrs;
+ slp->capacity = new_capacity;
+}
+
+void append_string_list(char * str, struct string_list * slp)
+{
+ if (slp->endp - slp->str == slp->capacity) grow_string_list(slp);
+ *slp->endp = strdup(str);
+ (slp->endp)++;
+}
+
+/*************** change to return index ****/
+int find_string(char * str, struct string_list * slp)
+{
+ /* find if str is in the list */
+ int i;
+ int n = slp->endp - slp->str;
+
+ for(i=0; i<n; ++i) {
+ if (strcmp((slp->str)[i], str)==0) return i;
+ }
+ return -1;
+}
+
+/* find if a string in string-list is a sub-string of str */
+int has_sub_string(char * str, struct string_list *slp)
+{
+ int i;
+ int n = slp->endp - slp->str;
+
+ for(i=0; i<n; ++i) {
+ if (strncmp(str, (slp->str)[i], strlen((slp->str)[i]))==0) return i;
+ }
+ return -1;
+}
+
+/* find if the str is a substr of those in slp */
+int has_newdir(char *str, struct string_list *slp)
+{
+ int i;
+ int n = slp->endp - slp->str;
+
+ for(i=0; i<n; ++i) {
+ if (strncmp((slp->str)[i],str, strlen(str))==0) return i;
+ }
+ return -1;
+}
+
+struct uint_list {
+ int capacity;
+ unsigned int * endp;
+ unsigned int * d;
+};
+
+void init_uint_list(struct uint_list * uil_ptr, int n)
+{
+ uil_ptr->d = malloc(n * sizeof(unsigned int));
+ uil_ptr->capacity = n;
+ uil_ptr->endp = uil_ptr->d;
+}
+
+void grow_uint_list(struct uint_list * uilp)
+{
+ int new_capacity = 2 * uilp->capacity;
+ unsigned int * old_ptrs = uilp->d;
+ unsigned int * new_ptrs;
+ unsigned int * newp = malloc(new_capacity * sizeof(unsigned int));
+ new_ptrs = newp;
+
+ while (old_ptrs < uilp->endp) {
+ *new_ptrs++ = *old_ptrs++;
+ }
+
+ free(uilp->d);
+ uilp->d = newp;
+ uilp->endp= new_ptrs;
+ uilp->capacity = new_capacity;
+}
+
+void append_uint_list(unsigned int data, struct uint_list * uilp)
+{
+ if (uilp->endp - uilp->d == uilp->capacity) grow_uint_list(uilp);
+ *uilp->endp = data;
+ (uilp->endp)++;
+}
+/*************** change to return index ****/
+int find_unit(unsigned int data, struct uint_list * uilp)
+{
+ /* find if data is in the list */
+ int i;
+ int n = uilp->endp - uilp->d;
+
+ for(i=0; i<n; ++i) {
+ if ((uilp->d)[i] == data) return i;
+ }
+ return -1;
+}
+
+struct string_list file_list;
+struct uint_list ino_list;
+struct string_list dir_list;
+struct string_list softlink_list; /* for (a) */
+struct string_list newdir_list; /* for (b) */
+
+void strip(char * str)
+{
+ /* remove trailing \n and spaces */
+ char *pt;
+ char *pc = &str[strlen(str)-1];
+ while (*pc == ' ' || *pc == '\n') *(pc--) = '\0';
+ /* 20080317 remove leading spaces */
+ pt = pc = &str[0];
+ while (*pc == ' ') ++pc;
+ if (pc != pt) {
+ while (*pc != '\0') *pt++ = *pc++;
+ *pt = '\0';
+ }
+}
+
+void output_subs(char * str)
+{
+ return; /*************************** testing ***************/
+ /* to do (2) indicated in the above */
+ /********
+ char * pc;
+ char subs[PATH_MAX];
+ pc = strstr(str, "/");
+ if (!pc) return;
+
+ while (pc) {
+ strncpy(subs, str, pc-str);
+ subs[pc-str] = '\0';
+ if (find_string(subs, &dir_list)<0) {
+ printf("%s\n", subs);
+ append_string_list(subs, &dir_list);
+ }
+ pc = strstr(pc+1, "/");
+ }
+ ************/
+}
+
+/*** (a)
+ get those softlinks that points to a directory
+ this is to deal with the following scenario
+ previous structure
+ dir_path (a directory)
+ db (a directory)
+
+ newly updated structure on master
+ dir_path -> db
+ db
+
+ rsync --dry-run generates
+ dir_path -> db [a link is done on target]
+ deleting dir_path/sub/filename1 [wrong file gets removed ]
+ deleting dir_path/sub/filename2...
+
+ file_operations.c does this when dir_path -> db is due
+ delete dir_path (rm -rf)
+ make the softlink
+ But then the following delete will have undesired deletion.
+
+ ------------------------------------------------------------
+
+ (b)
+ t0 name -> xyz name -> xyz (target)
+ t1 name/ name -> xyz
+
+ rsync generates
+ name/ update_directory() won't have effect
+ name/f1 delivered to wrong place
+ name/f2
+ deleting name too late
+ ** the deletion should be done before not after.
+ For now, I will fail this code for this situation.
+
+***/
+void get_dir_softlinks(char *filename, char * basedir) {
+ FILE * fd;
+ char line[PATH_MAX];
+ struct stat st;
+
+ if ((fd = fopen(filename, "r")) == NULL) {
+ fprintf(stderr, "Cannot open file -- %s \n", filename);
+ exit(-1);
+ }
+
+ while (1) { /* for each line in the file */
+ char *pc;
+ char fn[PATH_MAX];
+
+ if (fgets(line, PATH_MAX, fd)==NULL) break;
+ strip(line);
+ if (strlen(line) == 0) continue; /* skip blank line */
+
+ /* the softlink case is indicated by -> */
+ pc= strstr(line, " -> ");
+ if (pc) { /* it is a softlink */
+ *pc = '\0';
+ /* check if it is a directory */
+ sprintf(fn, "%s/%s", basedir, line);
+
+ /* check if the link-target is a directory */
+ if (stat(fn, &st)<0) continue; /* We skip this bad entry - no longer exist */
+
+ if (S_ISDIR(st.st_mode)) {
+ append_string_list(line, &softlink_list);
+ }
+ } else { /* not a softlink --> find if it is a directory */
+ /* find a line without ' ' and with trailing '/' */
+ pc = strstr(line, " "); /* the first space */
+ if (!pc) {
+ char * plast = &line[0] + strlen(line) - 1;
+ if (*plast == '/') {
+ append_string_list(line, &newdir_list);
+ }
+ }
+ }
+ }
+
+ fclose(fd);
+}
+
+
+int main(int argc, char * argv[])
+{
+ char * filename;
+ char * basedir;
+ FILE *fd;
+ char line[PATH_MAX];
+
+ if (argc < 3) {
+ fprintf(stderr, "Usage: trFilelist synclist_filename basedir\n");
+ exit(-1);
+ }
+
+ filename = argv[1];
+ basedir = argv[2];
+
+ init_string_list(&file_list, 10);
+ init_uint_list(&ino_list, 10);
+ init_string_list(&dir_list, 100);
+ init_string_list(&softlink_list, 10);
+ init_string_list(&newdir_list, 100);
+
+ get_dir_softlinks(filename, basedir);
+
+ if ((fd = fopen(filename, "r")) == NULL) {
+ fprintf(stderr, "Cannot open file -- %s \n", filename);
+ return -1;
+ }
+
+ while (1) { /* for each line in the file */
+ char *pc;
+ char fn[PATH_MAX];
+ struct stat st;
+ int newdir_flag;
+
+ if (fgets(line, PATH_MAX, fd)==NULL) break;
+ strip(line);
+ if (strlen(line) == 0) continue; /* skip blank line */
+ if (strcmp(line, ".")==0) continue;
+ if (strcmp(line, "./")==0) continue;
+
+ /* first we look for deleting entry */
+ if (strncmp(line, "deleting ", 9)==0) {
+ /* deleting (directory) file_path */
+ char * p1, *p2, *pf;
+
+ p1 = strstr(line, " "); /* the first space */
+ p2 = strstr(p1+1, " "); /* deleting directory filepath * 20070912 this is old */
+ pf = (p2) ? p2+1 : p1+1;/* it's always p1+1 */
+
+ newdir_flag = has_newdir(pf, &newdir_list);
+
+ if ((has_sub_string(pf, &softlink_list)<0) && newdir_flag<0) {
+ /* see comments above get_dir_softlinks() */
+ printf("deleting %s\n", pf);
+ } else if (newdir_flag>=0) { /* temporary action */
+ /*** we can simply skip this block later. 20070912 ***/
+ /***/
+ fprintf(stderr, "CRITICAL ERROR: An old softlink has been changed to a directory!\n");
+ fprintf(stderr, " For now, we crash this code for human intervention\n");
+ fprintf(stderr, " line= %s\n", line);
+ exit(-1);
+ /***/
+ }
+
+ continue;
+ }
+
+ /* the softlink case is indicated by -> */
+ pc= strstr(line, " -> ");
+ if (pc) {
+ *pc = '\0';
+ output_subs(line);
+ printf("%s\n", line);
+ continue;
+ }
+
+ /* if rsync's -H is turned on, the output may contain
+ file => tar_hardlink_file (relative address)
+ */
+ pc= strstr(line, " => ");
+ if (pc) {
+ *pc = '\0';
+ output_subs(line);
+ printf("%s %s\n", line, pc+4);
+ continue;
+ }
+
+ /* the rest of the entries should be valid paths */
+ sprintf(fn, "%s/%s", basedir, line);
+ if (lstat(fn, &st)<0) continue; /* We skip this bad entry -
+ (1) the header and tail lines
+ (2) perhaps the file no longer exists */
+
+ /* is this a hardlink? */
+ if (st.st_nlink > 1) {
+ int index;
+ output_subs(line);
+ if ((index = find_unit((unsigned int)st.st_ino, &ino_list))<0) {
+ append_uint_list((unsigned int)st.st_ino, &ino_list);
+ append_string_list(line, &file_list); /* relative path */
+ printf("%s\n", line);
+ } else {
+ printf("%s %s\n", line, file_list.str[index]);
+ }
+ continue;
+ }
+
+ /* all others */
+ output_subs(line);
+ printf("%s\n", line);
+ } /* end of one line */
+
+ fclose(fd);
+ return 0;
+}