From a5309fed914fdaa7697f2d369e7dcd02309063ab Mon Sep 17 00:00:00 2001 From: Guillaume Horel Date: Wed, 4 Nov 2015 12:30:44 -0500 Subject: initial import --- GPL.License | 280 ++++++++++++++++++ Makefile | 70 +++++ Makefile.Sun | 71 +++++ README | 302 +++++++++++++++++++ README.more | 353 ++++++++++++++++++++++ backup.c | 108 +++++++ cmdToTarget.py | 43 +++ complaint_sender.c | 158 ++++++++++ complaints.c | 579 ++++++++++++++++++++++++++++++++++++ file_operations.c | 800 ++++++++++++++++++++++++++++++++++++++++++++++++++ global.c | 35 +++ id_map.c | 74 +++++ main.h | 189 ++++++++++++ mrsync.py | 284 ++++++++++++++++++ mrsync_config.py | 67 +++++ multicaster.c | 582 ++++++++++++++++++++++++++++++++++++ multicatcher.c | 181 ++++++++++++ page_reader.c | 426 +++++++++++++++++++++++++++ parse_synclist.c | 320 ++++++++++++++++++++ proto.h | 182 ++++++++++++ rtt.c | 258 ++++++++++++++++ rttcatcher.c | 118 ++++++++ rttcomplaint_sender.c | 103 +++++++ rttcomplaints.c | 270 +++++++++++++++++ rttmain.h | 126 ++++++++ rttmissings.c | 93 ++++++ rttpage_reader.c | 188 ++++++++++++ rttproto.h | 116 ++++++++ rttsends.c | 144 +++++++++ sends.c | 329 +++++++++++++++++++++ set_catcher_mcast.c | 142 +++++++++ set_mcast.c | 160 ++++++++++ setup_socket.c | 242 +++++++++++++++ signal.c | 93 ++++++ signal.h | 32 ++ timing.c | 109 +++++++ trFilelist.c | 449 ++++++++++++++++++++++++++++ 37 files changed, 8076 insertions(+) create mode 100644 GPL.License create mode 100644 Makefile create mode 100644 Makefile.Sun create mode 100644 README create mode 100644 README.more create mode 100644 backup.c create mode 100755 cmdToTarget.py create mode 100644 complaint_sender.c create mode 100644 complaints.c create mode 100644 file_operations.c create mode 100644 global.c create mode 100644 id_map.c create mode 100644 main.h create mode 100755 mrsync.py create mode 100644 mrsync_config.py create mode 100644 multicaster.c create mode 100644 multicatcher.c create mode 100644 page_reader.c create mode 100644 parse_synclist.c create mode 100644 proto.h create mode 100644 rtt.c create mode 100644 rttcatcher.c create mode 100644 rttcomplaint_sender.c create mode 100644 rttcomplaints.c create mode 100644 rttmain.h create mode 100644 rttmissings.c create mode 100644 rttpage_reader.c create mode 100644 rttproto.h create mode 100644 rttsends.c create mode 100644 sends.c create mode 100644 set_catcher_mcast.c create mode 100644 set_mcast.c create mode 100644 setup_socket.c create mode 100644 signal.c create mode 100644 signal.h create mode 100644 timing.c create mode 100644 trFilelist.c diff --git a/GPL.License b/GPL.License new file mode 100644 index 0000000..960fe74 --- /dev/null +++ b/GPL.License @@ -0,0 +1,280 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc. + 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Library General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9de65aa --- /dev/null +++ b/Makefile @@ -0,0 +1,70 @@ +# ----- start of system dependent section ----- + +INSTALL = cp -p + +SUNFLAG = # -D_SUN on Solaris machines +IPV6FLAG = # -DIPV6 for IPv6 +DEBUG = # -g -ggdb +CFLAGS = -O ${DEBUG} -Wall ${SUNFLAG} ${IPV6FLAG} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 +#LIBS = -lsocket # for Solaris +LIBS = # there is no special lib needed, unless your system put the lib in non-standard place + +# The directory to install mrsync and others in. +bindir = /usr/local/bin + +# ----- end of system dependent section ------- + +CLEANFILES = *.o *~ + +PROGS = multicaster multicatcher rtt rttcatcher trFilelist +SCR = mrsync.py mrsync_config.py cmdToTarget.py +OBJ1 = multicaster.o multicatcher.o \ + parse_synclist.o sends.o complaints.o \ + complaint_sender.o page_reader.o file_operations.o backup.o \ + set_catcher_mcast.o set_mcast.o + +OBJ4 = rtt.o rttsends.o rttcomplaints.o \ + rttcatcher.o rttpage_reader.o rttcomplaint_sender.o rttmissings.o + +all: ${PROGS} + +install: ${PROGS} + ${INSTALL} ${PROGS} ${SCR} ${bindir} + +# common files +signal.o: signal.h + +# multicasting +${OBJ1}: main.h proto.h + +multicaster: multicaster.o global.o setup_socket.o set_mcast.o \ + parse_synclist.o \ + sends.o complaints.o backup.o \ + timing.o signal.o id_map.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +multicatcher: multicatcher.o global.o setup_socket.o set_catcher_mcast.o \ + page_reader.o complaint_sender.o \ + file_operations.o signal.o timing.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# for rtt and rttcatcher +${OBJ4}: rttmain.h rttproto.h + +rtt: rtt.o setup_socket.o set_mcast.o \ + rttsends.o rttcomplaints.o timing.o signal.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +rttcatcher: rttcatcher.o setup_socket.o set_catcher_mcast.o \ + rttpage_reader.o rttcomplaint_sender.o rttmissings.o \ + signal.o timing.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# misc +trFilelist: trFilelist.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# to clean up +clean: + rm -f ${PROGS} ${CLEANFILES} + diff --git a/Makefile.Sun b/Makefile.Sun new file mode 100644 index 0000000..8fe57da --- /dev/null +++ b/Makefile.Sun @@ -0,0 +1,71 @@ +# ----- start of system dependent section ----- + +INSTALL = cp -p + +SUNFLAG = -D_SUN # -D_SUN on Solaris machines +IPV6FLAG = # -DIPV6 for IPv6 +DEBUG = # -g -ggdb +CC = gcc-2.95.3.ren # 32-bit compiler +CFLAGS = -O ${DEBUG} -Wall ${SUNFLAG} ${IPV6FLAG} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 +LIBS = -lsocket # for Solaris +#LIBS = /usr/local/mtools/nova/lib/lea.Linux.o # for monster clusters (32-bit lib) + +# The directory to install mrsync and others in. +bindir = /usr/local/bin + +# ----- end of system dependent section ------- + +CLEANFILES = *.o *~ + +PROGS = multicaster multicatcher rtt rttcatcher trFilelist +SCR = mrsync.py mrsync_config.py cmdToTarget.py +OBJ1 = multicaster.o multicatcher.o \ + parse_synclist.o sends.o complaints.o \ + complaint_sender.o page_reader.o file_operations.o backup.o \ + set_catcher_mcast.o set_mcast.o + +OBJ4 = rtt.o rttsends.o rttcomplaints.o \ + rttcatcher.o rttpage_reader.o rttcomplaint_sender.o rttmissings.o + +all: ${PROGS} + +install: ${PROGS} + ${INSTALL} ${PROGS} ${SCR} ${bindir} + +# common files +signal.o: signal.h + +# multicasting +${OBJ1}: main.h proto.h + +multicaster: multicaster.o global.o setup_socket.o set_mcast.o \ + parse_synclist.o \ + sends.o complaints.o backup.o \ + timing.o signal.o id_map.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +multicatcher: multicatcher.o global.o setup_socket.o set_catcher_mcast.o \ + page_reader.o complaint_sender.o \ + file_operations.o signal.o timing.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# for rtt and rttcatcher +${OBJ4}: rttmain.h rttproto.h + +rtt: rtt.o setup_socket.o set_mcast.o \ + rttsends.o rttcomplaints.o timing.o signal.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +rttcatcher: rttcatcher.o setup_socket.o set_catcher_mcast.o \ + rttpage_reader.o rttcomplaint_sender.o rttmissings.o \ + signal.o timing.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# misc +trFilelist: trFilelist.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# to clean up +clean: + rm -f ${PROGS} ${CLEANFILES} + diff --git a/README b/README new file mode 100644 index 0000000..d665370 --- /dev/null +++ b/README @@ -0,0 +1,302 @@ + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +20060416 version 3.0 + This is a major update from previous version 2.1. New features include: + -- large file support + multicaster can now transfer file whose size is larger than 2**31-1. + -- platform independence (between linux, unix) + We have tested this independence from a Sun to a Linux machine. + -- catching slow machine as the feedback monitor dynamically + This significantly reduces the number of occurence of SIT-OUT-RECEIVING-FILE cases. + -- removing meta-file-info + Since our basic algorithm is different from that of Aaron's, the meta-file-info + is redundant for in our case. + -- backup feature (as in rsync) + If the target machines are NFS data servers, backup feature is absolutely + necessary. The feature is implemented in the low level multica* codes, + I will incorporate it into mrsync.py in the next month or so. + -- mcast IP address and port options + They are now tested and are working. + This allows multiple multicasting sessions running simultaneously. + -- code cleanup in both mrsync.py and *.c + -- code fixing for 64bit arch; + tested on Debian 64 bit arch by Nicolas Marot in France + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT case + -- code provision for IPv6 (but not tested yet) +version 3.1.1 + -- minor bug fixes. + -- add checking in multicatcher to check if the + system is ready for write. + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. +version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() +version 4.0.0 + -- previously, missing page report is sent back one page + at a time. I change it to ~60K pages at a time. This + drastically reduces the network traffic. + Performance (syncing time) is in general improved. + -- clean up scheme in choosing a monitor machine. + -- misc code cleaning up and minor adjustments. + +version 4.0.1 + -- Dave Close at ThalesGroup in CA points out + the type of c in "c = getopt()" should have been "int c" + [ It had been "char c". ] + +The mrsync package includes the following files: +-------------------- +README -- this file (usage example in the end.) +README.more -- another readme. +GPL.License -- legal stuff +mrsync.py -- the driver script that glues everything together +mrsync_config.py -- system dependent configuration files +cmdToTarget.py -- module to send command to remote machines +Makefile.Sun +Makefile.linux +backup.c +complaint_sender.c +complaints.c +file_operations.c +global.c +id_map.c +multicaster.c +multicatcher.c +page_reader.c +parse_synclist.c +rtt.c +rttcatcher.c +rttcomplaint_sender.c +rttcomplaints.c +rttmissings.c +rttpage_reader.c +rttsends.c +sends.c +set_catcher_mcast.c +set_mcast.c +setup_socket.c +signal.c +timing.c +trFilelist.c +main.h +rttproto.h +proto.h +signal.h +rttmain.h + +------------ +To install, +------------ +(1) copy these files to a directory on the target and + master machines. + (All target machines need at least a copy of the + multicatcher.) + Or the best (efficient) solution is to install the package + on a common file server to which all machines can access. +(2) Depending on which platform you are in, + mv Makefile.Sun (or MakeFile.linux) into Makefile. +(3) edit the system dependent section in Makefile. + Especially, the bindir variable. + It is used in the 'install' to copy the executable + binaries into the destination directory. +(4) make + to compile and generate the executables. (the warnings for format are harmless). +(5) make install + to copy the executables into $bindir. +(6) edit mrsync_config.py + change the paths for your system. + mrsync_config.py is read by mrsync.py +(7) check if your system has rsync installed + in the directory specified in rsyncPath and remote_rsyncPath + in the file mrsync_config.py +(8) check if your system has python installed. + Python, like perl, is included in most linux distribution. + As distributed, mrsync.py invokes python through the 1st line + !/usr/bin/env python + If this does not work for you system, you replace + the 1st line of mrsync.py with correct one. + +------------------------------ +Usage of mrsync.py +------------------------------ + +(1) the function of ping -- + This is removed from multicaster. + Instead, please use rtt which gives more useful info. + See 'Usage of rtt' below. + +(2) mrsync.py -m /dirX/target.list + -s /dirY/data + -t /dirZ/data + + This is a routine job carried out at Renaissance Technologies Corp. + The file target.list lists the names of all target machines (one name per line). + + Before carrying out multicasting, mrsync.py invokes rsync, + to find those files in the source directory (-s), + compared with the target directory (-t) in the first target machine, + that need to be transfered to all target machines. + If -t is not specified, then mrysnc assumes that the dest_path + on target machines is same as that in -s. + + mrsync.py uses rsync with the following minimum options: + --rsh=rsh -avW --dry-run --delete + This should work in most of routine cases. + This minumum option is specified in mrsync_config.py. + I did not test if other combination of options will work + so I would advise that you do not change the minimum option. + + However, you can add additional options on top of that minimum + options. Use '-o' in mrsync.py + e.g. + mrsync2.py -m list -s /path/sync/test -t /dest/sync/test -v 1 -o '-H' + This turns on --hard-link option in rsync. But hardlink is very tricky + to deal with. The minumum options supplied in mrsync_config.py is the one + that we find works pretty good in terms of dealing with hardlinks. + I do find that some rare sequence of syncing sessions requires that I repeat + another round of mrsync.py with additional + -o '-H' to set the hardlinks on the targets. + + The output of rsync is stored in a tmp file, which in turn gets + parsed by trFilelist. trFilelist translates the list of files into + a specific format for multicaster. + The reason for this two step process is to remove direct dependence of + multicaster on rsync. + The gain is that we can now do the case as shown in (3). + +(3) mrsync.py -m machine1,machine2 + -s /dirB/data/ + -l *.c,subs/foo*,sub2/path/[0-9][0-9].ext + + Here have two new things as compared with the case in (2). + First, the two target machines are listed directly + in -m option in the form of csv list + (instead of listing them in a machine_list file). + Second, the -l option is specified. + -l option lists the files that we want to be synced. + i.e. with this option, we bypass the rsync. + In this example, we want to sync + all /dirB/data/*.c files, + all /dirB/data/subs/foo* files, + and all /dirB/data/sub2/path/[0-9][0-9].ext + The pattern specified should be as those recognizable by 'ls'. + + Obviously, this usage is convenient for syncing small number of + specific files to a couple of machines. + +(4) mrsync.py -m /dirA/target_list + -s /dirB/data + -A 239.255.70.88 + -P 8010 + + Same functionality as in (2). + But here we specify the multicast IP address and port number + for use by both multicatcher and multicaster. + + This allows multiple mrsync sessions to run simultaneously + as long as each session uses its own mcast_address and port_number. + [ No overlap between sessions. ] + +(5) mrsync.py -m /dirA/target_list + -s /path + -X + Same functionality as in (2). + But (2) is a normal sync. This one invokes the -X. + In a normal sync, a given file (let's say /path/foo) is synced + with two steps, + (a) sync the file to /path/foo.tmp + (b) mv /path/foo.tmp /path/foo + With -X option, the same file is synced with three steps, + (a) rm /path/foo + (b) sync the file to /path/foo.tmp + (c) mv /path/foo.tmp /path/foo + This is useful when the target disk is almost full, and + could not afford to have foo.tmp and foo co-existing. + +(6) By default, mrsync.py prints out only very essential messages. + If you want to see more messages (related to some detailed status including + a rtt_histogram at the end of the session), + you can spcify '-v 1' in the mrsync.py command line argument list. + '-v 2' will print out even more detailed messages --- for debugging purpose! + +(7) other options of mrsync.py + type 'mrsync.py' and see them for yourself. + +-------------------------------------------------------------------------------- +Usage of rtt +---------------- +rtt is a utility for testing between master and one machine. +Its main function is to report the (histogram) distribution of the +round-trip-time interval for pages sent. +I use it to ping a machine +and to measure the typical rtt time and to see if the network is +connected properly. System administrators can use the rtt time info +to turn up the network. + +Example usage: + +(1) on master, I invoke rtt and it gives the report as follows. + +master:~ 1>rtt -r ssh -m target -n 1000 -s 64000 + +using ssh to invoke rttcatcher on target +Sun Apr 16 17:36:58 EDT 2006 +ssh target '/path/rttcatcher -a 239.255.67.200 -p 7900 1>/dev/null 2>/dev/null & echo $!' +remote pid = 27283 +Sending data... +Missing pages = 0 ( 0.00%) out of total pages = 1000 +rtt histogram +msec counts +---- -------- + 1 989 + 2 3 + 3 7 + 4 1 + + +The option -m specifies the name of the target machine. +-n specifies the number of pages that we want to send. +-s specifies the size of each page. (maximum = 64512) + +In this example, the total number of pages sent is 1000 +which is the sum of the counts in the rtt historgram table. +The histogram shows that the majority of rtt is around 1-2 msec. +If the network is really busy, the distribution of the histogram +will be broadened (instead of concentrated around one particular value). + +(2) more options for rtt. + type 'rtt' and see them on the screen. + +-------------------------------------------------------------------------------- + +HP Wei +hp@rentec.com +Renaissance Technologies Corp. + + diff --git a/README.more b/README.more new file mode 100644 index 0000000..3a8f7e1 --- /dev/null +++ b/README.more @@ -0,0 +1,353 @@ + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ++------------+ +| CHANGE LOG | ++------------+ +Jul - Oct 2008 -- version 4.0 + -- previously, missing page report is sent back one page + at a time. I change it to ~60K pages at a time. This + drastically reduces the network traffic. + -- clean up scheme in choosing a monitor machine. +Feb - May 2006 -- version 3.0 + verision 3.0.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + We transmit the file_stat info right before + the file is being synced. And the file-stat info + is transfered by ascii format to avoid byte-ordering + translation. + The orignal meta_data is not necessay under this scheme. + -- catching slow machine as the feedback monitor + [ the signal_handler scheme in version 2.0 is + removed. ] + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64-bit arch by Nicolas Marot in France + Nicolas also translated mrsync.py to mrsync.sh + (nicolas.marot@gmail.com) + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + +June 9, 2005 -- version 2.1 + Clint Byrum at clint@careercast.com asks about the exit status + of multicaster, which is changed so that it exits whenever + there are bad_machines or files_sat_out. +May 2005 -- Second version + The major changes are: + (1) implementing congestion control so that mrsync is more + net-traffic-friendly. + (2) using python script as the glue to put everything together. + i.e. the previous mrsync.c is replaced by mrsync.py + Other changes collected over the years through interactions with + users include: + (1) adding options to change the default IP address for multicast, + and the default port number for flow control. + This was suggested by Robert Dack + (2) replacing memory mapped file IO with the usual + seek() and write() sequence. + This change was echoed by Clint Byrum + (3) adding verbose control so that by default mrsync prints + only essential info instead of detailed status report. + This was suggested by Clint Byrum + +Jan 2002 -- First version uploaded to http://freshmeat.net/projects/mrsync/ + The tar file is in ftp://felder.org/pub/mrsync.tar + ++-----------------+ +| MRSYNC vs RSYNC | ++-----------------+ +mrsync is a package that consists of utilities for transfering +a bunch of files from a master machine to multiple target +machines simultaneously +by using the multicasting capability in the UNIX system. +The name 'mrsync' is inspired by the +popular utility 'rsync' for synchronizing files between +two machines. However, beyond this similarity in the +functionality, mrsync is fundamentally different from rsync +in three areas. +(1) rsync uses TCP while mrsync needs UDP in order + to use the multicasting part of UNIX's socket communication. + The former limits the data commuinication to one-to-one-machine + whereas the latter allows one-to-many. + UDP has, however, no built in flow control. As a result, + the major part of mrsync + (more precisely, the multicaster and multicatcher), + is devoted to synchronizing the data flow. +(2) For a given file, + rsync transfers optionally only those parts in the file + that are different + in the two versions on the master and the target machine. + This saves time, esp on a slow network. + mrsync, in contrast, transfers the whole content of a file + to all targets at one time. The time gain of mrsync comes from + its concurrency as described in (3). +(3) Replicated data servers have become poplular to serve + thousands of clients. Using rsync to replicate the data + to hundreds of disks is very time consuming. The total + time it takes is proportional to the number of target + disks. In contrast, mrsync scales much better because + it puts the contents of the file on the network only one + time. e.g. we have used mrsync to transfer 140GB of data + to 100 machines in a 1Gbit LAN in about 4-6 hours. + +This project has started in 2001. We have been using the tool +every day. +Recently, I have upgraded this tool to handle large-files, +to make it platform independent, and to make it 64bit ready. +The current version is 4.0.0 which is stable. +It has been used by many people in the UNIX community. +Lately, people in France has helped me test the code on 64bit arch. +They are proposing to make it a standard package in the Debian distribution. + +mrsync was originally posted on Freshmeat.net. But the link there +points to one of our company's disks. We would like to find a place +on SourceForge.net for this package. + + + ++-------------------+ +| HISTORY OF MRSYNC | ++ ------------------+ +The project of mrsync stemmed from the necessity to transfer +many files to hundreds of machines running Linux at Renaissance +Technologies Corp. Looking into the Open Source Community, we found +a preliminary utility codes of multicasting written by Aaron Hillegass. +Many unsuccessful test-runs on a huge amount of data files, however, +led us to embark on an overhaul of the code. +Most of the following items were inherited and bug-fixed from +the original codes. +* The low level functions that + interact with UNIX's multicasting sockets. +* Meta_data -- the essential info about a file which the master + machine will first transmit to the target machines. + [ removed in 2006 (see above change log)]. +* Division of a file into many 'pages'. +* The idea of maintaining a missing page flag. +* The idea of a multicaster and multicatcher loop. + +In this mrsync, we develop two new critical elements: +flow-control message communication conducted by the multicaster, +and a four-state page reader (processor) in the multicatcher. +The former is to synchronize the task each machine is performing. +For example, the master will not start sending +the pages of a file unless all machines have acknowledged +the completion of openning the disk i/o for the file. +In order to accomodate these elements, the codes have been +changed significantly from the original version. +For example, the multicatcher now never asks for slowing down. +And multicaster sends data on a file-by-file basis. +The file integrity is achieved by orchestrating the +data flow which is closely monitored and conducted +by the master machine. + +[200505] we add congestion control into the code. +After the master sends one page for a file, it will not send +the next page until it receives the acknowledgement (ack) message +sent by a monitor target (a feedbacker if you will). +This simply-minded congestion control prevents sending pages +at a pace with which a busy network cannot digest. +The feedbacker is chosen at the initialization stage of multicaster. +It may be changed by multicaster whenever it fails to send back +ack message within a certain duration. + +[2006] The monitor machine will send back ack message +only after it has written the page to disk. This way the 'rtt' +calculated by the master machine includes the disk IO time, +instead of just the network-traffic time. The master also +selects the slowest target machine as the monitor machine. +These two ingredients provide more precise picture of +how busy the whole system is, and thus tend to leave no target +machine behind. + +As of today, mrsync has been in full use at Renaissance +on a regular basis. + ++-------------------+ +| TEST RUNNING TIME | ++-------------------+ +[200810] using version 4.0.0. The performance improves (if the target machines + are not busy). + Number of targets = 32. All linux machines on 1Gbit network. + +Total number of files = 283 Pages w/o ack = 0 ( 0.00%) +Total number of pages = 100129 Pages re-sent = 6852 ( 6.84%) +Total number of bytes = 6449407171 Bytes re-sent = 442030792 ( 6.85%) +Total time spent = 2.55 (min) ~ 0.37 (min/GB) + +Send pages time = 1.78 (min) ~ 0.26 (min/GB) + +rtt histogram +msec counts +---- -------- + 0 103415 + 1 1985 + 2 162 + 3 73 + 4 145 + 5 68 + 6 253 + 7 203 + 8 305 + 9 184 + 10 156 + +[200604] using version 3.x.x, here is an example of the final statistics + in one of our routine syncing jobs. + Number of targets = 32. All linux machines on 1Gbit network. + +total number of files = 4076 +Total number of pages = 480719 Pages re-sent = 39994 ( 8.32%) +Total number of bytes = 30875638130 Bytes re-sent = 2573924662 ( 8.34%) +Total time spent = 39.50 (min) ~ 1.18 (min/GB) + +rtt histogram +msec counts +---- -------- + 0 371023 + 1 118874 + 2 21977 + 3 3779 + 4 2226 + 5 760 + 6 584 + 7 580 + 8 288 + 9 162 + 10 92 + ... + +[200201] +25 minutes for a group of files whose total size amounts to 4.6GB. +(This data is obtained from running on 5 SUN machines + with Solaris 8 on an Ethernet LAN whose bandwidth is 100Mbits/sec.) + ++-------------------------------------+ +| MAIN ALGORITHM FOR SENDING ONE FILE | ++-------------------------------------+ +In the multicaster code running on the master machine: +In the initialization stage, the code selects one target machine +as a monitor machine (feedbacker), which sends back acknowledgement +message when one page is received. +The main loop for multicaster is as follows. +(1) Send OPEN_FILE command (message). +(2) Wait for all machine to send back acknowledgement (ack). +(3) If any machine does not ack back within a certain time period, + that machine is considered bad and is dropped out + of the monitoring list. + During that waiting time period, the master sends the + OPEN_FILE message to those potentially bad machines regularly. +(4) Start sending pages, + (all pages, in the first time round; + those missing pages reported back by target machines, in other rounds). + During this period, all target machines except the feedbacker, if activated, + just receive and process whatever pages that arrive at the receive buffer. + + There are two modes of operation for the master to send these pages. + (a) When the congestion control is turned off (-x option in mrsync.py) + the master sends out one page and wait for a duration (interpage interval), + which is specified as DT_PERPAGE in main.h, before it sends the next one. + (b) If the congestion control is turned on (the default behavior of mrsync.py), + the master will not send the next page until it receives the ack. + On the feedbacker, once a SIGIO is triggered indicating the arrival + of a page, it sends back ack message to the master. +(5) Send EOF message to signal the end of transmission + for this file. +(6) Wait for all machines to send back ack. + They either report ok + or report a list of page numbers that are missing. +(7) If any machine does not ack back within a certain time period, + that machine is considered bad and is dropped out + of the monitoring list. + During that waiting time period, the master sends the + EOF message to those potentially bad machines regularly. +(8) If there are missing pages, go back to (4). +(9) Send CLOSE_FILE message. +(10) Wait for all machine to send back ack. +(11) If any machine does not ack back within a certain time period, + that machine is considered bad and is dropped out + of the monitoring list. + During that waiting time period, the master sends the + CLOSE_FILE message to those potentially bad machines regularly. +(12) If there are more files to transfer, go back to (1). + + +In multicatcher running on the target machine: +the main loop performs the following steps, +(1) Start with IDLE_STATE. +(2) Upon receiving OPEN_FILE and being in IDLE_STATE, + set up a momory mapped temporary file whose size + equals that specified in the meta_data, + which has been received in prior time. + if things are not successful, + don't send back ack. + else + send back ack and enter GET_DATA_STATE. +(3) Upon receiving one UDP and being in GET_DATA_STATE, + store it into the right place in the temporary file. + Expect to receive more UDP data and go back to (3). +(4) Upon receiving EOF and being in GET_DATA_STATE, + if there are some missing pages, + report them and expect to go back to (3). + else there are no missing page, + send back ack and enter DATA_READY_STATE. + if there is sick conditions, + send back ack and enter SICK_STATE. +(5) Upon receiving CLOSE_FILE and being in DATA_READY_STATE, + if in DATA_READY_STATE, + rename(temporary_file, the_real_filename), + clean up the memory mapped area. + if things go well, + send back the ack, enter IDLE_STATE + and expect to go back to (1), + else don't send back ack. + else if in SICK_STATE, + send back sit_out ack, enter IDLE_STATE + and expect to go back to (1). + +In addition to the above main loop that runs on all target machines, +the selected monitor machine (feedbacker) needs to send back +an received_ack message for every page received and processed. + +-------------------------------------------------------------- +Note: The original version dealt with three types of 'files': + directory, softlink and regular file. + mrsync includes one more: hardlink file. + +HP Wei +hp@rentec.com +Renaissance Technologies Corp. +Nov 15, 2001 (first version) +May 20, 2005 (second version) +Apr 16, 2006 (third version) +Oct 28, 2008 (fourth version) + + + diff --git a/backup.c b/backup.c new file mode 100644 index 0000000..3c59a9d --- /dev/null +++ b/backup.c @@ -0,0 +1,108 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include /* use POSIX in order to be portable to linux */ + +extern int verbose; +int backup = FALSE; +char *pattern_baseDir = NULL; + +/* --------- for syncing all files with/without backup. */ +int nPattern=0; /* number of regular expression for backup files */ +regex_t ** fPatterns;/* array of pointers to regular expression */ + +int set_nPattern(char * fpat_file) +{ + FILE *fd; + char pat[PATH_MAX]; + int count = 0; + if ((fd = fopen(fpat_file, "r"))==NULL) { + fprintf(stderr, "Cannot open file -- %s\n", fpat_file); + return FAIL; + } + while (!feof(fd)) { + if (fscanf(fd, "%s", pat) == 1) { + ++count; + } + } + nPattern = count; + fclose(fd); + return SUCCESS; +} + +int read_backup_pattern(char * fpat_file) +{ + FILE *fd; + char pat[PATH_MAX]; + int i = 0; + + if (!set_nPattern(fpat_file)) return FAIL; + + fPatterns = malloc(sizeof(void *)*nPattern); + for(i = 0; i< nPattern; ++i) { + fPatterns[i] = (regex_t *) malloc(sizeof(regex_t)); + } + + fd = fopen(fpat_file, "r"); + + /* + if we don't prepend ^, + then pattern 'file' intended for files under srcBase + will select files with pattern = subdir/file.* + which is not our intention. + */ + i = 0; + while (!feof(fd)) { + if (fscanf(fd, "%s", pat) == 1) { + char fullpat[PATH_MAX]; + sprintf(fullpat, "^%s", pat); + + regcomp(fPatterns[i], fullpat, REG_EXTENDED|REG_NOSUB); + ++i; + } + }; + fclose(fd); + return SUCCESS; +} + +int needBackup(char * filename) /* fullpath */ +{ + /* we reach this point when the backup flag is true + if no_pattern -> backup all of them + if pattern + if match -> backup + nomatch -> no-backup + */ + int i; + char *p; + if (!backup) return FALSE; + if (nPattern==0) return TRUE; + + p = filename + strlen(pattern_baseDir) + 1; /* +1 to get pass the / after basedir */ + /* fprintf(stderr, "nPattern = %d file= %s\n", nPattern, p); ********/ + for(i=0; i/dev/null" % machine; + os.system(killJobs); + return 0; + return 1; + +def docmd(rsh, machine, cmd): + # if scheme tmpfile is not used, p will fail to return if the + # output of cmd is large + tmpFile = tempfile.mktemp(); + p = popen2.Popen3("%s %s '%s' > %s 2>/dev/null" % (rsh, machine, cmd, tmpFile)); + + if not waitForJob(p, machine): return (0, []); + + lines = open(tmpFile).readlines(); + os.remove(tmpFile); + return (1, lines); + +# check if the machine can be talked to by rsh +def isMachineOK(rsh, machine): + p = popen2.Popen3("%s %s 'date 2>/dev/null'" % (rsh, machine)); + + if not waitForJob(p, machine): return False; + + lists = p.fromchild.readlines(); + + if len(lists)==0: return False; + return True; + diff --git a/complaint_sender.c b/complaint_sender.c new file mode 100644 index 0000000..53cfdb4 --- /dev/null +++ b/complaint_sender.c @@ -0,0 +1,158 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +/* complaint_send_socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; +extern int verbose; + +int seq = 0; + +/* send buffer */ +char complaint_buffer[FLOW_BUFFSIZE]; +int *ccode_ptr; /* complain code ---- see main.h */ +int *cmid_ptr; /* which machine*/ +int *cfile_ptr; /* which file -- for missing page */ +int *npage_ptr; /* # of pages -- for missing page */ +int *pArray_ptr; /* missing page arrary */ +int *fill_ptr; /* point to next array element */ + +/* ---------------------------------------------------------------- + routines to fill in pArray with the missing page indexes + --------------------------------------------------------------- */ +void fill_in_int(int i) +{ + *fill_ptr++ = htonl(i); +} + +void init_fill_ptr() +{ + fill_ptr = pArray_ptr; +} + +/*---------------------------------------------------------- + init_complaint_sender initializes the buffer to allow the + catcher to send complaints back to the sender. + + ret_address of sender to whom we will complain + is determined when we receive the first UDP data + in read_handle_page() in page_reader.c + ----------------------------------------------------------*/ +void init_complaint_sender() /* (struct sockaddr_in *ret_addr) */ +{ + /* ret_addr is sent by master, in network-byte-order */ + if (verbose>=2) + fprintf(stderr, "in init_complaint_sender\n"); + /* init the send_socket */ + complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT); + + /* set up the pointers so we know where to put complaint_data */ + ccode_ptr = (int *) complaint_buffer; + cmid_ptr = (int *)(ccode_ptr + 1); + cfile_ptr = (int *)(cmid_ptr + 1); + npage_ptr = (int *)(cfile_ptr + 1); + pArray_ptr= (int *)(npage_ptr + 1); +} + +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin_addr); +} +#else +void update_complaint_address(struct sockaddr_in6 *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin6_addr); +} +#endif + +/*------------------------------------------------------------------------ + send_complaint fills the complaint buffer and send it through our socket + back to the sender + + The major use is to tell master machine which pages of which file + needs to be re-transmitted. + complaint -- the complain code defined in main.h + mid -- machine id + file -- the file index + npage -- # of missing pages + followed by an array of missing page index [ page_1, page_2, ... ] + + It is also used for sending back acknoledgement. + complaint -- the ack code defined in main.h in the same complaint section. + mid -- machine id + file -- which file + page -- seq number (out of seq complaints will be ignored by the catcher) + ------------------------------------------------------------------------*/ +void send_complaint(int complaint, int mid, int page, int file) +{ + /* fill in the complaint data */ + /* 20060323 add converting to network byte-order before sending out */ + int bytes; + *ccode_ptr = htonl(complaint); + *cmid_ptr = htonl(mid); + *cfile_ptr = htonl(file); + if (complaint==MISSING_PAGE || complaint==MISSING_TOTAL) { + *npage_ptr = htonl(page); + } else { + *npage_ptr = htonl(seq++); + } + + bytes = (complaint==MISSING_PAGE) ? ((char*)fill_ptr - (char*)ccode_ptr) + : (char*)pArray_ptr - (char*)ccode_ptr; + + /* send it */ + if(sendto(complaint_fd, complaint_buffer, bytes, 0, + (const struct sockaddr *)&complaint_addr, + sizeof(complaint_addr)) < 0) { + perror("Sending complaint\n"); + } + if (verbose>=2) + printf("Sent complaint:code=%d mid=%d page=%d file=%d bytes=%d\n", + complaint, mid, page, file, bytes); +} + + + + + + + + diff --git a/complaints.c b/complaints.c new file mode 100644 index 0000000..fb48da2 --- /dev/null +++ b/complaints.c @@ -0,0 +1,579 @@ +/* + Copyright (C) 2005-2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +/*#include "paths.h"*/ +#include +#include +#include +#include + +extern int monitorID; /* defined in multicaster.c */ +extern int my_FLOW_PORT; +extern int verbose; +extern int nPages; +extern char * cmd_name[]; +extern unsigned int total_pages, real_total_pages; +extern off_t total_bytes, real_total_bytes; + +/* buffer for receiving complaints */ +char flow_buff[FLOW_BUFFSIZE]; +int *code_ptr; /* What's wrong? */ +int *mid_ptr; /* machine id */ +int *file_ptr; /* which file */ +int *npage_ptr; /* # of pages */ +int *pArray_ptr;/* missing page arrary */ + +/* receive socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +/* status */ +char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */ +int *last_seq; /* array of size nMachines -- seq number of complaints */ + /* *** watch out the max seq = 2e9 */ +int *total_missing_page; /* array of size nMachines -- persistent thru life of program*/ +char *file_received; /* array of size nMachines */ +char *bad_machines; /* array of size nMachines -- persistent thru life of program*/ +char *machine_status; /* array of size nMachines for ack */ +int *missing_pages; /* array of size nMachines */ + +int nMachines; +int has_missing; /* some machines have missing pages for this file (a flag)*/ +int has_sick; /* some machines are sick for this file (a flag)*/ +int skip_count=0; /* number of files that are not delivered */ +int quitWithOneBad=FALSE; /* default: continue with one or more (=2) + fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE); + + /* get pointers set to the right place in buffer */ + code_ptr = (int *) flow_buff; + mid_ptr = (int *) (code_ptr + 1); + file_ptr = (int *) (mid_ptr + 1); + npage_ptr = (int *) (file_ptr + 1); + pArray_ptr= (int *) (npage_ptr+1); + + /* Receive socket (the default buffer size is 65535 bytes */ + if (verbose>=2) printf("set up receive socket for complaints\n"); + complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT); + + /* + getsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &i, &il); + printf(" rcvbuf = %d type = %d\n", i, il); + exit(0); + the default in our machines -> size = 65535 and type = 4 + */ + + rcv_size = TOTAL_REC_PAGE * FLOW_BUFFSIZE; + if (setsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer for init_complaints"); + } +} + +void init_missing_page_flag(int n) +{ + int i; + nPages = n; + if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) { + fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n); + perror("error = "); + exit(0); + } + for(i=0; i= (unsigned int) nMachines) || /* boundary check for mid_v for safety */ + (bad_machines[mid_v] == BAD_MACHINE)) { /* ignore complaint from a bad machine*/ + return 0; + } + + code_v = ntohl(*code_ptr); + file_v = ntohl(*file_ptr); + npage_v = ntohl(*npage_ptr); + + /* check if the complaint is for the current file */ + if (code_v != MONITOR_OK && file_v != current_entry()) return 0; + /* out of seq will be ignored */ + if (code_v != MISSING_PAGE && code_v != MISSING_TOTAL) { /********* MISSING_TOTAL ? *************/ + if (npage_v <= last_seq[mid_v]) return 0; + else last_seq[mid_v] = npage_v; + } + + switch (code_v) { + case PAGE_RECV: + /******** check if machineID is the one we have set. */ + /*if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);*/ + if (cmd == SENDING_DATA && mid_v == monitorID) + return 1; + else + return 0; + + case MONITOR_OK: + /********* check if machineID is the one we have set. */ + if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID); + if (cmd == SELECT_MONITOR_CMD && mid_v == monitorID) + return 1; + else + return 0; + + case OPEN_OK : + if (cmd == OPEN_FILE_CMD) { + machine_status[mid_v] = MACHINE_OK; + return 1; + } else { + return 0; + } + + case CLOSE_OK : + if (cmd == CLOSE_FILE_CMD || cmd == CLOSE_ABORT_CMD) { + machine_status[mid_v] = MACHINE_OK; + return 1; + } else { + return 0; + } + + case EOF_OK : + if (cmd == EOF_CMD && file_received[mid_v]==NOT_RECV) { + machine_status[mid_v] = MACHINE_OK; + file_received[mid_v] = FILE_RECV; + return 1; + } else { + return 0; + } + + case MISSING_PAGE : + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0; + if (npage_v > nPages) return 0; + { + int i, *pi, page_v; + pi = pArray_ptr; + for (i = 0; i nPages) continue; /*** make sure page_v starts with 1*/ + missing_page_flag[page_v-1] = MISSING; + } + } + missing_pages[mid_v] += npage_v; + set_has_missing(); + return 1; + + case MISSING_TOTAL: + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV || machine_status[mid_v] == MACHINE_OK) + return 0; + /* Consider to add: if npage_v >missing_pages[mid_v], ask to resend + [ likely no big gain ] */ + total_missing_page[mid_v] += npage_v; + set_has_missing(); /* store the info about missing info */ + machine_status[mid_v] = MACHINE_OK; /* machine_status serves as ack only */ + return 1; + + case SIT_OUT : + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0; + fprintf(stderr, "*** %s sits-out-receiving %s\n", + id2name(mid_v), getFilename()); + machine_status[mid_v] = MACHINE_OK; + + if (!has_sick) ++skip_count; + set_has_sick(); + return 1; + + default : + if (verbose>=2) fprintf(stderr, "Unknown complaint: code = %d\n", code_v); + return 0; + } /* end of switch */ + } /* end of if(readable) */ + + /* time out of readable() */ + return -1; +} + +int all_machine_ok() +{ + int i; + for(i=0; i= ACK_WAIT_PERIOD) { + ++count; + if (count < my_ACK_WAIT_TIMES) { + if (verbose>=1 && (count % 10 == 0)) + fprintf(stderr, " %d: resend cmd(%s) to machines:[ ", count, cmd_name[code]); + for(i=0; i=1 && (count % 10 == 0)) fprintf(stderr, "%d ", i); + send_cmd(code, (int) i); + usleep(FAST); + } + } + if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "]\n"); + rtime0 = rtime1; + } else { /* allowable period of time has passed */ + fprintf(stderr, " Drop these bad machines:[ "); + for(i=0; i0) { + fprintf(stderr, "\nWarning: There are %d files which are not delivered.\n", skip_count); + exit_code = -1; + } + + fprintf(stderr, "\nTotal number of files = %12d Pages w/o ack = %12u (%6.2f%%)\n", + total_entries(), pages_wo_ack(), (double)pages_wo_ack()/(double)real_total_pages*100.0); + + dp = real_total_pages - total_pages; + fprintf(stderr, "Total number of pages = %12d Pages re-sent = %12u (%6.2f%%)\n", + total_pages, dp, (double)dp/(double)total_pages*100.0); + + delta = (off_t)(real_total_bytes - total_bytes); + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "Total number of bytes = %12llu Bytes re-sent = %12llu (%6.2f%%)\n", + total_bytes, delta, (double)delta/(double)total_bytes*100.0); + #else + fprintf(stderr, "Total number of bytes = %12d Bytes re-sent = %12u (%6.2f%%)\n", + total_bytes, delta, (double)delta/(double)total_bytes*100.0); + #endif + + return (exit_code); +} + +/* count the number of bad machines */ +int nBadMachines() +{ + int i, count = 0; + for(i=0; i 0) fprintf(stderr, "]\n"); + return count; +} + +int send_done_and_pr_msgs(double total_time, double t_page) +{ + int exit_code1 =0; + int exit_code2 =0; + int exit_code3 =0; + + send_all_done_cmd(); + + /* exit_code1 !=0 if there are files that were not delivered due to change or skipped */ + exit_code1 = pr_missing_pages(); + + fprintf(stderr, "Total time spent = %6.2f (min) ~ %6.2f (min/GB)\n\n", + total_time, total_time / ((double)real_total_bytes/1.0e9)); + fprintf(stderr, "Send pages time = %6.2f (min) ~ %6.2f (min/GB)\n\n", + t_page, t_page / ((double)real_total_bytes/1.0e9)); + + exit_code2 = choose_print_machines(bad_machines, + BAD_MACHINE, + "Not synced for bad machines:[ "); + + if (quitWithOneBad && nBadMachines() >=1) { + fprintf(stderr, "We choose to exit when at least one target is bad\n"); + fprintf(stderr, "All files following the current one did not get delivered\n"); + fprintf(stderr, "If resend cmd(CLOSE_FILE), then the current file may have been delivered to non-bad targets\n\n"); + } + + if (current_entry() < total_entries()) { /* if we exit prematurely */ + exit_code3 = choose_print_machines(machine_status, + NOT_READY, "\nNot-ready machines:[ "); + } + + if (verbose>=1) pr_rtt_hist(); + return (exit_code1+exit_code3); /* 200807 removed exit_code2 because bad machines case has been dealt with + by -q. If no -q, then the bad machines are considered 'harmless' */ +} + +/* to do some cleanup before exit IF all machines are bad */ +void do_badMachines_exit() +{ + if ((quitWithOneBad && nBadMachines() < 1) || + (!quitWithOneBad && (nBadMachines() < nMachines))) return; + + if (quitWithOneBad) + fprintf(stderr, "One (or more) machine is bad. Exit!\n"); + else + fprintf(stderr, "All machines are bad. Exit!\n"); + + send_done_and_pr_msgs(-1.0, -1.0); + exit(-1); +} + +void do_cntl_c(int signo) +{ + fprintf(stderr, "Control_C interrupt detected!\n"); + + send_done_and_pr_msgs(-1.0, -1.0); + exit(-1); +} diff --git a/file_operations.c b/file_operations.c new file mode 100644 index 0000000..d9a8733 --- /dev/null +++ b/file_operations.c @@ -0,0 +1,800 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + This file contains all the file-operations for multicatcher. + 20060404: + found a undetected omission in open_file(). + AFter lseek(), we should write a dummy byte so that + multicatcher.zzz has the right file size to start with. + Otherwise, it will grow as syncing progresses. + + Port the code to deal with Large_files. + esp in write_page(), + lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET) + 200603: + Remove the meta-data operation. + Each file's info (stat) is transfered to targets during + the OPEN_FILE_CMD. extract_file_info() in this file + is to get that stat info for the current entry(file). + + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + Previously, memory mapped file was used for file IO + but later was changed to simple open() and lseek(), write(). + This was also echoed in a patch by Clint Byrum . + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was originally called wish_list.c + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include +#include "main.h" + +extern int verbose; +extern int machineID; + +char * baseDir = NULL; +char * missingPages=NULL; /* starting address of the array of flags */ +int fout; /* file discriptor for output file */ + +int toRmFirst = FALSE; /* remove existing file first and then sync */ +/* off_t total_bytes_written;*/ +unsigned int nPages; +int had_done_zero_page; +int current_file_id = 0; /* 1, 2, 3 ... or -1, -2 ... for backup */ +int backup; /* flag for a file that needs backup when deletion */ +char *backup_suffix = NULL; +char my_backup_suffix[] = "_mcast_bakmmddhhmm"; + +/* file stat_info which is transmitted from master */ +mode_t stat_mode; +nlink_t stat_nlink; +uid_t stat_uid; +gid_t stat_gid; +off_t stat_size; +time_t stat_atime; +time_t stat_mtime; + +char filename[PATH_MAX]; +char linktar[PATH_MAX]; +char fullpath[PATH_MAX]; +char tmp_suffix[L_tmpnam]; + +void default_suffix() +{ + time_t t; + struct tm tm; + time(&t); + localtime_r(&t, &tm); + sprintf(my_backup_suffix, "_mcast_bak%02d%02d%02d%02d", + tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min); + backup_suffix = my_backup_suffix; + return; +} + +void get_tmp_suffix() +{ + /* this is called once in each multicatcher */ + char tmp[L_tmpnam]; + tmpnam_r(&tmp[0]); + strcpy(tmp_suffix, basename(tmp)); +} + +int make_backup() +{ + char fnamebak[PATH_MAX]; + if (strlen(fullpath) + strlen(backup_suffix) > (PATH_MAX-1)) { + fprintf(stderr, "backup filename too long\n"); + return FAIL; + }; + + if (!backup) return SUCCESS; /* if not match with pattern, skip the backup */ + + sprintf(fnamebak, "%s%s", fullpath, backup_suffix); + /* + The backup scheme is as follows. + ln file file.bak (mv file file.bak would cause file to non-exist for a short while) + mv file.new file + */ + if (link(fullpath, fnamebak) != 0) { + if (errno != ENOENT && errno != EINVAL) { + fprintf(stderr,"hardlink %s => %s : %s\n",fullpath, fnamebak, strerror(errno)); + return FAIL; + } + } + if (verbose >= 2) { + fprintf(stderr, "backed up %s to %s\n",fullpath, fnamebak); + } + return SUCCESS; +} + +void get_full_path(char * dest, char * sub_path) +{ + /* prepend the sub_path with baseDir --> dest */ + strcpy(dest, baseDir); + strcat(dest, "/"); + strcat(dest, sub_path); +} + +void get_tmp_file(char * tmp) +{ + /*char *fncopy;*/ + /* ******* change to filename_mcast.fileabc%$&? */ + /* fncopy = strdup(filename); dirname change the string content */ + strcpy(tmp, baseDir); + strcat(tmp, "/"); + strcat(tmp, filename); + strcat(tmp, "_"); + strcat(tmp, TMP_FILE); + strcat(tmp, tmp_suffix); + /* free(fncopy); */ +} + +int my_unlink(const char *fn) +{ + if (verbose>=2) + fprintf(stderr, "deleting file: %s\n", fn); + if (unlink(fn) != 0) { + if (errno==ENOENT) { + return SUCCESS; + } else { + /* NOTE: unlink() could not remove files which do not have w permission!*/ + /* resort to shell command */ + char cmd[PATH_MAX]; + sprintf(cmd, "rm -f %s", fn); + if (system(cmd)!=0) { + fprintf(stderr, "'rm -f' fails for %s\n", fn); + return FAIL; + } + } + } + return SUCCESS; +} + +int my_touch(const char*fn) +{ + int fo; + if( (fo = open(fn, O_RDWR | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) { + perror(fn); + return FAIL; + } + + /* set the size of the output file */ + if(lseek(fo, 0, SEEK_SET) == -1) { + fprintf(stderr, "cannot seek for %s\n", fn); + perror(fn); + return FAIL; + } + close(fo); + return SUCCESS; +} + +int extract_file_info(char * buf, int n_file, unsigned int n_pages) +{ + /* + Along with OPEN_FILE_CMD, the data area in rec_buf contains + (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0 + where the stat_string contains the buf in + sprintf(buf, "%lu %lu %lu %lu %lu %lu %lu", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime); + */ + char * pc = &buf[0]; + + #ifdef _LARGEFILE_SOURCE + if (sscanf(pc, "%u %u %u %u %llu %lu %lu %d", &stat_mode, &stat_nlink, + &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime, + &toRmFirst) != 8) + return FAIL; + #else + if (sscanf(pc, "%u %u %u %u %lu %lu %lu %d", &stat_mode, &stat_nlink, + &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime, + &toRmFirst) != 8) + return FAIL; + #endif + + /* fprintf(stderr, "size= %llu\n", stat_size); *********/ + + pc += (strlen(pc) + 1); + strcpy(filename, pc); + get_full_path(fullpath, filename); + + linktar[0] = '\0'; + if (S_ISLNK(stat_mode) || stat_nlink > 1) { /* if it is a softlink or hardlink */ + pc += (strlen(pc) +1); + strcpy(linktar, pc); + } + + nPages = n_pages; + current_file_id = n_file; + backup = (current_file_id < 0); + had_done_zero_page = FAIL; + /*total_bytes_written = 0;*/ + return SUCCESS; +} + +int open_file() +{ + int i; + + /* + sometimes for disk space reason, it is necessary + to first remove the file and sync. + If toReplace is true, the backup option should be off. + */ + if (toRmFirst) { + if (!my_unlink(fullpath)) { + fprintf(stderr, "Replacing "); + perror(filename); + return FAIL; + } + } + + /* fprintf(stderr, "%d %d %d\n", stat_mode, stat_nlink, stat_size); *********/ + if (S_ISREG(stat_mode) && stat_nlink == 1) { + /* if it is a regular file and not a hardlink */ + char tmpFile[PATH_MAX]; + get_tmp_file(tmpFile); + + my_unlink(tmpFile); /* make sure it's not there from previous runs */ + + if( (fout = open(tmpFile, O_RDWR | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) { + fprintf(stderr, "cannot open() %s for writing. \n", tmpFile); + perror(tmpFile); + return FAIL; + } + + /* set the size of the output file (see Steve's book on page 411) */ + if(lseek(fout, stat_size - 1 , SEEK_SET) == -1) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "lseek() error for %s with size = %llu\n", tmpFile, stat_size); + #else + fprintf(stderr, "lseek() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size); + #endif + perror(tmpFile); + close(fout); + return FAIL; + } + if (write(fout, "", 1) != 1) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "write() error for %s with size = %llu\n", tmpFile, stat_size); + #else + fprintf(stderr, "write() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size); + #endif + perror(tmpFile); + close(fout); + return FAIL; + } + } + + /* init missingPages flags */ + if (!missingPages) free(missingPages); + missingPages = malloc(sizeof(char) * nPages); + for(i=0; i < nPages; ++i) missingPages[i] = MISSING; + + if (verbose>=2) fprintf(stderr, "Ready to receive file = %s\n", filename); + return SUCCESS; +} + +int close_file() +{ + /* delete missingPages flags which was malloc-ed in open_file() */ + free(missingPages); + missingPages = NULL; + + if (S_ISREG(stat_mode) && stat_nlink == 1) { + /* if it is a regular file and not a hardlink */ + char tmpFile[PATH_MAX]; + struct stat stat; + + if ((fout != -1) && (close(fout) != 0)) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n", + filename, stat_size); + #else + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n", + filename, (unsigned int)stat_size); + #endif + perror("close"); + return FAIL; + } + fout = -1; /* if the following fails, the reentry wont do munmap() */ + + /* the real work */ + /* get_full_path(oldFile, filename); **** unnecessary fullpath is the oldFile */ + + get_tmp_file(tmpFile); + + /* 8/14/2002 + If there was a hardware (disk IO) problem + the sync should not proceed. + ** Add the following checking. + */ + if (lstat(tmpFile, &stat)<0) { + perror("ERROR: close_file() cannot lstat the tmp file\n"); + return FAIL; + } + + if (backup && !make_backup(fullpath)) { /* make a hardlink oldFile => backup_file */ + fprintf(stderr, "fail to make backup for %s\n", fullpath); + return FAIL; + } + if (rename(tmpFile, fullpath)<0) { + perror("ERROR: close_file():rename() \n"); + return FAIL; + } + + /* 20071016 in rare occasion, the written file has not the right size */ + if (lstat(fullpath, &stat)<0) { + fprintf(stderr, "ERROR: close_file() cannot lstat %s\n", fullpath); + return FAIL; + } + if (stat_size != stat.st_size) { + fprintf(stderr, "ERROR: close_file() filesize != incoming-size\n"); + return FAIL; + } + } + + /* for debug + if (verbose>=2) fprintf(stderr, "total bytes written %llu for file %s\n", + total_bytes_written, filename); + */ + + return SUCCESS; +} + +int rm_tmp_file() +{ + char tmpFile[PATH_MAX]; + + if ((fout != -1) && (close(fout) != 0)) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n", + filename, stat_size); + #else + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n", + filename, (unsigned int)stat_size); + #endif + perror("rm_tmp"); + return FAIL; + } + fout = -1; + + get_tmp_file(tmpFile); + + return (my_unlink(tmpFile)); +} + +int nPages_for_file() +{ + return nPages; +}; + +/* return total number of missing pages */ +int get_missing_pages() +{ + int i, result=0; + + for(i=0; i < nPages; ++i) + if ((missingPages[i]) == MISSING) ++result; + return result; +} + +int is_missing(int index) +{ + return (missingPages[index] == MISSING) ? TRUE : FALSE; +} + +void page_received(int index) +{ + missingPages[index] = RECEIVED; +} + +/* + write() in write_page() may block forever. + This function is to check if write() is ready. +*/ +int writable(int fd) +{ + struct timeval write_tv; + fd_set wset; + FD_ZERO(&wset); + FD_SET(fd, &wset); + + write_tv.tv_sec = WRITE_WAIT_SEC; + write_tv.tv_usec = WRITE_WAIT_USEC; + return (select(fd + 1, NULL, &wset, NULL, &write_tv)==1); +} + +void write_page(int page, char *data_ptr, int bytes) +{ + /* page = page number starting with 1 */ + if (page < 1 || page > nPages) return; + + /* Do we need to write this page? */ + if (is_missing(page-1)){ + if (!writable(fout)) return; + + /* Write the data */ + if (lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET)<0) { + if (verbose>=1) { + fprintf(stderr, "ERROR: write_page():lseek() at page %d for %s\n", + page, filename); + perror("ERROR"); + } + return; + } + if (write(fout, data_ptr, bytes)<0) { + /* write IO error !!! */ + perror("ERROR"); + fprintf(stderr, "write_page():write() error: at page %d for %s\n", page, filename); + return; + } + + /* Mark the page as received in our wish list */ + page_received(page-1); + /*total_bytes_written += bytes;*/ + } else { + /* If we don't need to write it, just return */ + if (verbose >=2) { + fprintf(stderr, "Already have page %d for %d:Ignoring\n", page, current_file_id); + } + } + return; +} + +/* For files whose size is 0 */ +int touch_file() +{ + if (verbose >=2) + fprintf(stderr, "touching file: %s\n", fullpath); + + my_unlink(fullpath); + return my_touch(fullpath); + /* system() + VERY time in-efficient + char cmd[PATH_MAX]; + sprintf(cmd, "touch %s", fullpath); + return (system(cmd)==0); + */ +} + +int delete_file(int to_check_dir_type) +{ + struct stat st; + char fp[PATH_MAX]; + int trailing_slash; + int type_checking; + + strcpy(fp, fullpath); + + /* remove trailing slash if any -- for deletion-'type' checking */ + if (to_check_dir_type) { + char *pc; + type_checking = TRUE; + pc = &fp[0] + strlen(fp) - 1; + if (*pc=='/') { + *pc = '\0'; + trailing_slash = TRUE; + } else { + trailing_slash = FALSE; + }; + } else { + type_checking = FALSE; + } + + if(lstat(fp, &st) < 0) { + /* already gone ? */ + return SUCCESS; + } + if (S_ISREG(st.st_mode) || S_ISLNK(st.st_mode)) { /* delete a file or link */ + if (verbose>=2) + fprintf(stderr, "deleting file: %s\n", fp); + + if (type_checking && trailing_slash) { /* intended to remove a directory when it is not */ + return FAIL; + } + + if (backup && S_ISREG(st.st_mode) && !make_backup(fp)) {/* backup regular file */ + return FAIL; /* failed to make_backup */ + } + return (my_unlink(fp)); + } else if (S_ISDIR(st.st_mode)) { /* remove a directory */ + char cmd[PATH_MAX]; + if (verbose>=2) + fprintf(stderr, "deleting directory: %s\n", fp); + + if (type_checking && (!trailing_slash)) { /* intended to remove a non-dir when it is directory */ + return FAIL; + } + + sprintf(cmd, "rm -rf %s", fp); /* remove everything in dir, watch out for this */ + return (system(cmd)==0); + } + /* not file, link, directory */ + fprintf(stderr, "unrecognized file_mode for %s\n", fp); + return FAIL; +} + +/* send complaints to the master for missing data */ +int ask_for_missing_page() +{ + int i, n=0, total=0; + + /* + Send missing page indexes if any + */ + init_fill_ptr(); + for(i=0; i < nPages; ++i) { + if (missingPages[i] == MISSING ) { + ++n; + ++total; + if (n > MAX_NPAGE) { + /* send previous missing page-indexes */ + send_complaint(MISSING_PAGE, machineID, MAX_NPAGE, current_file_id); + init_fill_ptr(); + n = 1; + } + /* fill in one page index */ + fill_in_int(i+1); /* origin = 1 */ + } + } + /* send the rest of missing pages complaint */ + if (n>0) send_complaint(MISSING_PAGE, machineID, n, current_file_id); + + return total; /* there are missing pages */ +} + +void missing_page_stat() +{ + int i, n=0, sum=0, last=-1; + + for(i=0; i < nPages; ++i) { + if (missingPages[i] == MISSING ) { + ++n; + if (last<0) { + sum += i; + } else { + sum += (i - last); + } + last = i; + } + } + if (n>0) { + double a = sum; + double b = n; + fprintf(stderr, "file= %d miss= %d out-of %d avg(delta_index) = %f\n", + current_file_id, n, nPages, a/b); + } +} + +void my_perror(char * msg) +{ + char fn[PATH_MAX]; + sprintf(fn, "%s - %s", fullpath, msg); + perror(fn); +} + +int set_owner_perm_times() +{ + int state = SUCCESS; + + /* set owner */ + if (lchown(fullpath, stat_uid, stat_gid)!=0) { + my_perror("chown"); + state = FAIL; + } + + /* + set time and permission. + Don't try to set the time and permission on a link + */ + if (!S_ISLNK(stat_mode)) { + struct utimbuf times; + if (chmod(fullpath, stat_mode)!=0) { + my_perror("chmod"); + state = FAIL; + } + + times.actime = stat_atime; + times.modtime = stat_mtime; + if (utime(fullpath, ×)!=0) { + my_perror("utime"); + state = FAIL; + } + } + return state; +} + +int update_directory() +{ + struct stat st; + int exists; + char fp[PATH_MAX], *pc; + + if (verbose>=2) + fprintf(stderr, "Updating dir: %s\n", fullpath); + + /* if fullpath is a softlink that points to a dir, + and it has a trailing '/', + lstat() will view it as a directory ! + So, we remove the trailing '/' before lstat() */ + strcpy(fp, fullpath); + pc = &fp[0] + strlen(fp) - 1; + if (*pc=='/') *pc = '\0'; + + if(lstat(fp, &st) < 0) { + switch(errno) { + case ENOENT: + exists = FALSE; + break; + default: + my_perror("lstat"); + return FAIL; + } + } else { + exists = TRUE; + } + + if (!exists) { + /* There's nothing there, so create dir */ + if (mkdir(fp, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + } else if (!S_ISDIR(st.st_mode)) { + /* If not a directory delete what is there */ + if (unlink(fp)!=0){ + my_perror("unlink"); + return FAIL; + } + if (verbose>=2) + fprintf(stderr, "Deleted file %s to replace with directory\n", fp); + if (mkdir(fp, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + } else { + /* If dir exists, just chmod */ + chmod(fp, stat_mode); + /*** 20070410: changing mtime of a dir can cause NFS to confuse + See http://lists.samba.org/archive/rsync/2004-May/009439.html + So, I comment it out in the following + + struct utimbuf times; + times.actime = stat_atime; + times.modtime = stat_mtime; + ***/ + /* + if (utime(fullpath, ×) < 0) { + my_perror("utime"); + } + */ + return SUCCESS; + } +} + +int update_directory0() +{ + DIR *d; + + struct utimbuf times; + times.actime = stat_atime; + times.modtime = stat_mtime; + + if (verbose>=2) + fprintf(stderr, "Creating dir: %s\n", fullpath); + + d = opendir(fullpath); + if (d == NULL) { + switch(errno) { + case ENOTDIR: + /* If not a directory delete what is there */ + if (unlink(fullpath)!=0){ + my_perror("unlink"); + return FAIL; + } + if (verbose>=2) + fprintf(stderr, "Deleted file %s to replace with directory\n", fullpath); + /* Fall through to ENOENT */ + case ENOENT: + /* There's nothing there, so create dir */ + if (mkdir(fullpath, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + default: + my_perror("opendir"); + return FAIL; + } + } else { + /* If dir exists, just chmod */ + closedir(d); + chmod(fullpath, stat_mode); + /*** 20070410: changing mtime of a dir can cause NFS to confuse + See http://lists.samba.org/archive/rsync/2004-May/009439.html + So, I comment it out in the following ***/ + /* + if (utime(fullpath, ×) < 0) { + my_perror("utime"); + } + */ + return SUCCESS; + } +} + +int check_zero_page_entry() +{ + /* when total_pages = 0, the entry can be + an empty file + softlink (hardlink), + a directory + */ + if (had_done_zero_page) return SUCCESS; /* to avoid doing it again */ + + if (S_ISDIR(stat_mode)) { /* a directory */ + if (!update_directory()) { + had_done_zero_page = FAIL; + return FAIL; + } + } else if (S_ISLNK(stat_mode)) { /* Is it a softlink? */ + if (verbose>=2) + fprintf(stderr, "Making softlink: %s -> %s\n", fullpath, linktar); + delete_file(FALSE); /* remove the old one at fullpath */ + if (symlink(linktar, fullpath) < 0) { + my_perror("symlink"); + had_done_zero_page = FAIL; + return FAIL; + } + } else if (stat_nlink > 1) { /* hardlink */ + char fn[PATH_MAX]; + get_full_path(fn, linktar); /* linktar is a relative path from synclist */ + if (verbose>=2) + fprintf(stderr, "Making a hardlink: %s => %s\n", fullpath, fn); + my_unlink(fullpath); /* remove the old one */ + if (link(fn, fullpath)!=0) { + my_perror("link"); + had_done_zero_page = FAIL; + return FAIL; + } + } else { + /* it must be a regular file */ + + if (!touch_file()) { + had_done_zero_page = FAIL; + return FAIL; + } else { + set_owner_perm_times(); + } + } + had_done_zero_page = SUCCESS; + return SUCCESS; +} + diff --git a/global.c b/global.c new file mode 100644 index 0000000..64104eb --- /dev/null +++ b/global.c @@ -0,0 +1,35 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +char *cmd_name[] = { "TIME-OUT", + "TEST", + "SEND_DATA", + "RESEND_DATA", + "OPEN_FILE", + "EOF", + "CLOSE_FILE", + "CLOSE_ABORT", + "ALL_DONE", + "SEL_MONITOR", + "NULL"}; + +int verbose = 0; /* = 0 little detailed output, = 1,2 ... = n a lot more details */ +int machineID = -1; /* used for multicatcher */ + diff --git a/id_map.c b/id_map.c new file mode 100644 index 0000000..8b8dcaa --- /dev/null +++ b/id_map.c @@ -0,0 +1,74 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include +#include +#include +#include + +void strip(char * str); + +/* place to hold the array of string */ +char ** machine_names = NULL; /* array of (char*) */ +int nTargets=0; + +void get_machine_names(char * filename) +{ + FILE *fd; + char line[PATH_MAX]; + int count=0; + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + return; + } + while (fgets(line, PATH_MAX, fd) != NULL) { + strip(line); + if (strlen(line) != 0) ++count; + } + if (count == 0) { + fclose(fd); + fprintf(stderr, "No machine names in the file = %s\n", filename); + return; + } + + nTargets = count; + + rewind(fd); + machine_names = malloc(nTargets * sizeof(void*)); + + line[0] = '\0'; + count = 0; + while(fgets(line, PATH_MAX, fd) != NULL) { + strip(line); + if (strlen(line)==0) continue; + machine_names[count] = (char*)strdup(line); + line[0] = '\0'; + ++count; + } + fclose(fd); +} + +char * id2name(int id) +{ + return (machine_names) ? machine_names[id] : ""; +} diff --git a/main.h b/main.h new file mode 100644 index 0000000..48f8c55 --- /dev/null +++ b/main.h @@ -0,0 +1,189 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + Following the suggestion and the patch by Clint Byrum , + I added more control to selectively print out messages. + The control is done by the statement 'if (version >= n)' + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_h +#define __main_h + +#include +#include +#include +#include +#include +#include +#include +#include +#include /* sockaddr_in{} and other Internet defns */ +#include /* inet(3) functions */ +#include +#include /* for nonblocking */ +#include +#include +#include +#include +#include +#include /* for S_xxx file mode constants */ +#include /* for iovec{} and readv/writev */ +#include +#include +#include /* timeval{} for select() */ +#include +#include + +#define VERSION "4.0.1" + +/* logic values */ +#define FALSE 0 +#define TRUE 1 +#define FAIL (FALSE) +#define SUCCESS (TRUE) +#define GOOD_EXIT 0 +#define BAD_EXIT -1 + +/* Ports and addresses */ +#define PORT 8888 /* for multicast */ +#define FLOW_PORT (PORT-1) /* for flow-control */ +#define MCAST_ADDR "239.255.67.92" +#define MCAST_TTL 1 +#define MCAST_LOOP FALSE +#define MCAST_IF NULL + +/* + Handling socket's receive buffer on the target machine: + if the available data size in the receiveing buffer is larger + than TOO_MUCH then a TOO_FAST complaint is triggered. + The master will then sleep for USEC_TO_IDLE + Currently, this is not effective. +*/ +#define TOO_FAST_LIMIT (TOTAL_REC_PAGE / 2) /* if half is full, then too fast */ +#define TOO_MUCH (TOO_FAST_LIMIT * PAGE_BUFFSIZE) +#define USEC_TO_IDLE 1000000 + +/* TIMING stuff */ +#define FAST 5000 /* usec */ +#define DT_PERPAGE 6000 /* usec */ +#define FACTOR 90 /* interpage interval = FACTOR * DT_PERPAGE or DT_PERPAGE*/ +#define SECS_FOR_KILL 30 /* time(sec) allowed for 'kill -9 pid' to finish */ + +/* time for the master to wait for the acknowledgement */ +#define ACK_WAIT_PERIOD 1 /* secs (from time()); */ +#define ACK_WAIT_TIMES 60 /* wait for this many periods */ + +#define SICK_RATIO (0.9) +#define SICK_THRESHOLD (50) /* SICK FOR such many TIMES is really sick */ + +/* max wait time for write() a page of PAGE_SIZE -- 100 msec */ +#define WRITE_WAIT_SEC 0 +#define WRITE_WAIT_USEC 100000 + +#define SET_MON_WAIT_TIMES 6000 /* time = this number * FAST */ +#define NO_FEEDBACK_COUNT_MAX 10 +#define SWITCH_THRESHOLD 50 /* to avoid switching monitor too frequently + because of small diff in missing_pages */ + +/* complaints */ +#define TOO_FAST 100 +#define OPEN_OK 200 +#define CLOSE_OK 300 +#define MISSING_PAGE 400 +#define MISSING_TOTAL 500 +#define EOF_OK 600 +#define SIT_OUT 700 +#define PAGE_RECV 800 +#define MONITOR_OK 900 + +/* Sizes */ +/* 20060427: removed size_t which is arch-dependent */ +#define PAGE_SIZE 64512 +#define HEAD_SIZE (sizeof(int) + 2 * sizeof(int) + 2 * sizeof(int)) +#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE) +#define TOTAL_REC_PAGE 20 /* change to 4 in case hit the OS limit in buf size */ + +#define FLOW_HEAD_SIZE (sizeof(int)*4) +#define FLOW_BUFFSIZE (PAGE_SIZE+FLOW_HEAD_SIZE) +#define MAX_NPAGE (PAGE_SIZE / sizeof(int)) +/* + Modes and command codes: + The numerical codes are also the index to retrieve the command names + for printing in complaints.c +*/ +#define TIMED_OUT 0 +#define TEST 1 +#define SENDING_DATA 2 +#define RESENDING_DATA 3 +#define OPEN_FILE_CMD 4 +#define EOF_CMD 5 +#define CLOSE_FILE_CMD 6 +#define CLOSE_ABORT_CMD 7 +#define ALL_DONE_CMD 8 +#define SELECT_MONITOR_CMD 9 +#define NULL_CMD 10 + +/* machine status ----- for caster */ +#define MACHINE_OK_MISSING_PAGES '\2' +#define MACHINE_OK '\1' +#define NOT_READY '\0' + +#define BAD_MACHINE '\1' +#define GOOD_MACHINE '\0' + +#define FILE_RECV '\1' +#define NOT_RECV '\0' + +/* representation of all-targets for sends */ +#define ALL_MACHINES -1 + +/* PAGE STATUS */ +#define MISSING '\0' +#define RECEIVED '\1' + +/* MACHINE STATE ----- for catcher */ +#define IDLE_STATE 0 +#define GET_DATA_STATE 1 +#define DATA_READY_STATE 2 +#define SICK_STATE 3 + +/* + The following two are info to be packed into + meta data to represent either file or directory deletion. +*/ +/* SPECIAL # of PAGES to signal deleting action */ +#define TO_DELETE (-1) + +/* temporary file name prefix for transfering to */ +#define TMP_FILE "mrsync." + +#include "proto.h" + +#endif diff --git a/mrsync.py b/mrsync.py new file mode 100755 index 0000000..bd8c0c7 --- /dev/null +++ b/mrsync.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python + +import os,sys,string,time,getopt; + +my_module_path = os.path.dirname(sys.argv[0]); +sys.path.append(my_module_path); +from mrsync_config import * + +host = os.uname()[1]; + +def bin(path, bin_name): + "return the full-path of a binary code" + return '%s/%s' % (path, bin_name); + +def isPathThere(path): + "check the existence of path on this master machine" + if not os.path.exists(path): + print >>sys.stderr, 'Path (%s) could not be found on %s' % (path, host); + sys.exit(-1); + +map(isPathThere, [bin(binDir, x) for x in codes]); + +def printTimeMsg(msg): + print >>sys.stderr, 'Time = %s %s' % (time.ctime(time.time()), msg); + +def prMulticastLog(msg): + "append msg to the log file" + open(multicast_log, 'a').write('%s %s\n' % (time.ctime(time.time()), msg)); + +(catcher_err_log, goodTargetsFile, syncFileList) = \ + map(lambda x: + '%s.%s' % (x, ('%02d'*5) % time.localtime()[1:6]), + (catcher_err_log, goodTargetsFile, syncFileList)); + +def usage(): + print """mrsync.py (to sync files from one to many machines by multicast) + Option list: + [ -v ] + [ -w ] + [ -b ] + [ -o ] + [ -x flag to turn off monitor mechanism (not fully tested and not recommended) ] + ----- Essential options -------------------------------------------------------- + -m + -s + [ -t ] + [ -l + mrsync by default uses rsync to find the list unless this option is given. ] + ----- mcast options ------------------------------------------------------------ + [ -A ] + [ -P ] + [ -T ] + [ -L flag to turn on mcast_LOOP. default is off ] + [ -I ] + """ % (rBinDir, min_rsync_opt%reshell); + +# --- handle command line +opts, args = getopt.getopt(sys.argv[1:], 'hxv:w:r:b:o:m:s:t:l:A:P:T:LI:', []); + +if len(opts)==0 or len(args)>0: + usage(); + sys.exit(-1); + +verbose = 0; +ack_wait_times = -1; +without_monitor = False; +machineListFile = ''; +sourcePath = ''; +targetPath = ''; +synclist = ''; +rsync_opts = min_rsync_opt % reshell; + +mcast_addr = ''; +port = -1; +ttl = 1; +loop = False; +mcast_if = ''; + +if not len(opts) == 0: + for o,v in opts: + if o=='-v': + verbose = string.atoi(v); + elif o=='-w': + ack_wait_times = string.atoi(v); + elif o=='-h': + usage(); + sys.exit(0); + elif o=='-r': + reshell = v; + elif o=='-b': + rBinDir = v; + elif o=='-o': + rsync_opts += (' %s' % v) + elif o=='-m': + machineListFile = v; + elif o=='-s': + sourcePath = v; + elif o=='-t': + targetPath = v; + elif o=='-l': + synclist = v; + elif o=='-A': + mcast_addr = v; + elif o=='-P': + port = string.atoi(v); + elif o=='-T': + ttl = string.atoi(v); + elif o=='-L': + loop = True; + elif o=='-I': + mcast_if = v; + elif o=='-x': + without_monitor = True; + +if verbose>=1: print 'mrsync version 4.0.0'; + +if not machineListFile or not sourcePath: + print >>sys.stderr, 'Essential options (-m -s) should be specified.'; + usage(); + sys.exit(-1); + +isPathThere(sourcePath); +if not targetPath: targetPath = sourcePath; + +# ------------ get machine list +machines = (os.path.exists(machineListFile) and + [x[:-1] for x in open(machineListFile).readlines()] or + machineListFile.split(',')); + +# clean up the names +machines = filter(lambda x: x!='', machines); +machines = [x.strip() for x in machines]; + +if host in machines: + if verbose>=1: print 'remove myself (%s) from machine list...' % host; + machines.remove(host); + +# ------------ get the syncList from the first good machine +# the list is stored in syncFileList for multicaster. +import cmdToTarget; + +def cleanup(file): + "remove a file if it exists" + if os.path.exists(file): os.unlink(file); + +def get_synclist_from_cmdline(tmp): + "extracts synclist from cmdline option, outputs results into tmp_file" + cleanup(tmp); + if verbose>=1: print >>sys.stderr, 'extracting %s...' % synclist; + + list = (os.path.exists(synclist) and + [x[:-1] for x in open(synclist).readlines()] or + synclist.split(',')); + + if len(list)==0: + print >>sys.stderr, "Empty sync_list in cmd_line option %s" % synclist; + sys.exit(-1); + + def pr_relative_path(fullpath): + open(tmp, 'a').write('%s\n' % fullpath.replace(sourcePath+'/', '')); + + for item in list: + " each item can be a pattern for files" + import glob + map(pr_relative_path, glob.glob('%s/%s' % (sourcePath, item))); + + +def get_synclist_from_rsync(tmp): + "use rsync to get a list of to-be-synced files. results are put in tmp file" + cleanup(tmp); + for machine in machines: + # check if this machine is ok to rsh (ssh) + if (not cmdToTarget.isMachineOK(reshell, machine)): # if not go to next machine + continue; + + if verbose>=1: + print >>sys.stderr, 'Get to-be-synced files from %s...' % machine; + + cmd = ' '.join(filter(lambda x: x, + [rsyncPath, '--rsync-path='+remote_rsyncPath, \ + (reshell != 'rsh' and \ + rsync_opts.replace('--rsh=rsh', '--rsh=%s' % reshell) \ + or rsync_opts), + sourcePath+'/', + '%s:%s/' % (machine, targetPath), + '> %s 2>&1; ' % tmp])); + + if verbose>=1: print >>sys.stderr, cmd; + os.system(cmd); + break; + +def tmp_synclist(): + "intermediate synclist filename" + return '/tmp/%s' % os.path.basename(syncFileList); + +(synclist and get_synclist_from_cmdline or get_synclist_from_rsync)(tmp_synclist()); + +def ckFileSize(file): + if os.path.getsize(file)==0: + print >>sys.stderr, "Empty file = %s" % file; + sys.exit(-1); + +ckFileSize(tmp_synclist()); + +# translate the files generated by rsync or command-line option into +# a format which can be recognized by multicaster. +cmd = ' '.join([bin(binDir, translate), + tmp_synclist(), sourcePath, '>', syncFileList]); +if verbose>=1: print >>sys.stderr, cmd; +os.system(cmd); + +ckFileSize(syncFileList); + +# ------------ invoke multicatcher on all target machines +def gen_catcher_cmd(count): + "return mulitcatcher_command to be invoked on target machines" + return ' '.join(filter(lambda x: x, + [bin(rBinDir, catcher), + '-t', targetPath, + '-i', '%d'%count, # machine id + (mcast_addr and '-A '+mcast_addr or ''), + (port>0 and '-P %d'%port or ''), + (mcast_if and '-I %s'%mcast_if or ''), + '/dev/null 2>%s &' % catcher_err_log])); ### workaround ssh problem + +def invoke_catcher(ms, count, bads): + "invoke catcher for each machine in ms, return bad_machines in bads" + if not ms: return bads; + + machine = ms.pop(0); + + if (not cmdToTarget.isMachineOK(reshell, machine)): + if verbose>=1: print >>sys.stderr, "***%s is down" % machine; + bads.append(machine); + return invoke_catcher(ms, count, bads); + + cmd = gen_catcher_cmd(count); + if count==0 and verbose>=1: print >>sys.stderr,cmd; + status, output = cmdToTarget.docmd(reshell, machine, cmd); + + if (not status): + if verbose>=1: print >>sys.stderr, "***remote shell exec failed for %s" % machine; + bads.append(machine); + return invoke_catcher(ms, count, bads); + + if verbose>=1: print >>sys.stderr, 'id:%3d %s' % (count, machine); + open(goodTargetsFile, 'a').write('%s\n' % machine) + return invoke_catcher(ms, count+1, bads); + +printTimeMsg("Invoking multicatcher on all %d machines..." % len(machines)); +cleanup(goodTargetsFile); +badMachines = invoke_catcher(machines, 0, []); + +# -------------- invoke multicast on the master machine +printTimeMsg('Starting multicasting...'); +prMulticastLog('start multicast on %s' % host); + +def gen_caster_cmd(): + "return mulitcaster_command to be invoked on this host (master machine)" + return ' '.join(filter(lambda x: x, + [bin(binDir, caster), + '-v %d' % verbose, + '-m %s' % goodTargetsFile, + '-s %s' % sourcePath, + '-f %s' % syncFileList, + (ack_wait_times>0 and '-w %d'% ack_wait_times or ''), + (mcast_addr and '-A '+mcast_addr or ''), + (port>0 and '-P %d'%port or ''), + (ttl>1 and '-T %d'%ttl or ''), + (loop and '-L' or ''), + (mcast_if and '-I %s'%mcast_if or ''), + (without_monitor and '-x' or '')])); + +cmd = gen_caster_cmd(); +if verbose>=1: print cmd; +ex_code = os.system(cmd); +print >>sys.stderr, 'ex_code= ', ex_code; + +# -------------- to exit +printTimeMsg('ALL DONE.'); +prMulticastLog('multicast ends on %s' % host); +sys.exit(ex_code); diff --git a/mrsync_config.py b/mrsync_config.py new file mode 100644 index 0000000..5953e24 --- /dev/null +++ b/mrsync_config.py @@ -0,0 +1,67 @@ +# configuration file + +# used by mrsync.py to find the executables multicaster, multicatcher, trFilelist +# hopefully, all machines put them in the same place as on the master machine. +binDir = '/a/path/name'; # bin +rBinDir = binDir; # for remote target machines. + +# This is a general bookkeeping file for mrsync.py. +# It records start_time and end_time of a mrsync session. +# Each mrsync.py appends time info into this file. +# So it should be a path that can be accessed by any machine invoking mrsync.py + +multicast_log = '/path/multicast.log'; + +#--------------------------------------------------------------------------- +# The following three file-names are for tmp storage +# +# To allow for multiple multicast sessions at the same time, the actual +# name of the file will be that specified here plus a suffix = .mmddhhmmss +# The latter (mmddhhmm) is generated by mrsync.py +# --> It is necessary to clean up old files :)) +# + +# This is a path on all target machines for a running multicatcher +# to catch its stderr output if any. Usually we don't expect any output in this file. +# It's there for debugging purpose in case any error event occurs. +# +# Set it to /dev/null, if we don't want any output. +# +catcher_err_log = '/a_path/catcher.err'; + +# The goodTargetsFile is for mrsync.py to output names of the machines that are accessible. +# It is read by multicaster. +caster_log_dir = '/A_PATH/sync/'; +goodTargetsFile = caster_log_dir + 'syncing'; + +# The file path for storing list of to-be-transfered files +# It is used by multicaster. +syncFileList = caster_log_dir + 'syncfile.lst'; +# +# --------------------------------------------------------------------------- +# + +# Path to find the rsync binary +rsyncPath = '/usr/local/bin/rsync'; +remote_rsyncPath = rsyncPath; + +# +reshell = "rsh"; # can be changed thru command line option + +# +# Do NOT change the following unless you know what you are doing. +# +# This is the absolute minimum of rsync options for multicaster +# rsync is used only to find files that need to be synced. +# -- to add other options such as --exclude --include, use command line option +# those extra options will be appended to this min_rsync_opt +min_rsync_opt = '--rsh=%s -avW --dry-run --delete'; + +# +# varioius binary codes used +# +# the binary to translate file_list generated by rsync or manually for multicaster +translate = 'trFilelist'; +catcher = 'multicatcher'; +caster = 'multicaster'; +codes = [ translate, catcher, caster]; diff --git a/multicaster.c b/multicaster.c new file mode 100644 index 0000000..3b26cae --- /dev/null +++ b/multicaster.c @@ -0,0 +1,582 @@ +/* + Copyright (C) 2006-2008 Renaissance Technologies Corp. + main developer: HP Wei + version 3.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + -- catching slow machine as the feedback monitor + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64 bit arch by Nicolas Marot in France + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() + version 4.0 major update + -- consolidate sending missing pages in complaint flow + cutting the messages by one order magnitude + -- exit code adjustment + + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#include /* to define PATH_MAX */ +#include +#include +#include + +extern int verbose; +extern char * machine_list_file; /* defined in complaints.c */ +extern char * bad_machines; /* array of size nMachines, defined in complaints.c */ +extern char * file_received; +extern int * missing_pages; +extern int backup; +extern int nPattern; +extern char * pattern_baseDir; +extern int nTargets; +extern int my_ACK_WAIT_TIMES; +extern int toRmFirst; +extern int quitWithOneBad; +extern int skip_count; +extern int file_changed; + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +int my_TTL = MCAST_TTL; +int my_LOOP = MCAST_LOOP; +char * my_IFname = MCAST_IF; + +int monitorID = -1; + +int no_feedback_count; + +void usage() +{ + fprintf(stderr, + "multicaster (to copy files to many multicatchers) version %s)\n" + " Option list:\n" + " [ -v ]\n" + " [ -w \n" + " -s \n" + " -f \n" + " -------- options for backup ---------------------------------\n" + " [ -b flag to turn on backup ]\n" + " [ -r for regex patterns for files needing backup ]\n" + " [ -d for regex patterns ]\n" + " -------- mcast options --------------------------------------\n" + " [ -A **same as for multicatcher ]\n" + " [ -P **same as for multicatcher ]\n" + " [ -T ]\n" + " [ -L flag turn on mcast_LOOP. default is off ]\n" + " [ -I ]\n", + VERSION, my_ACK_WAIT_TIMES, MCAST_ADDR, PORT, my_TTL); +} + + +int monitor_cmd(int cmd, int machineID) +{ + /* + Once this fx is called the old monitor will be + turned off. So, we need to make sure this fx + returns TRUE for a machine. Or the monitor_set_up + should fail. + */ + int count =0, resp; + set_delay(0, FAST); + send_cmd(cmd, machineID); + while (1) { /* wait for ack */ + resp = read_handle_complaint(cmd); + if (resp<0) { /* time-out */ + ++count; + send_cmd(cmd, machineID); + if (count > SET_MON_WAIT_TIMES) { + return FALSE; + } + } else if (resp==1) { /* got ack */ + return TRUE; + } + /* irrevelant resp -- continue */ + } +} + +int find_max_missing_machine(char *flags) +{ + /* flags[] -> which machines are not considered */ + int i, index = -1; + int max = -1; + int threshold; + + for(i=0; i max && flags[i] == GOOD_MACHINE) { + max = missing; + index = i; + } + } + + threshold = (get_nPages() > SWITCH_THRESHOLD*100) ? + get_nPages() / 100 : SWITCH_THRESHOLD; + if (index>=0) { + if (monitorID>=0) { + if (flags[monitorID]==BAD_MACHINE) return index; + /** if (max <= (total_missing_page[monitorID] + threshold)) **/ + if (max <= (missing_pages[monitorID] + threshold)) + return monitorID; + } + return index; + } else { /* could come here if all are busy*/ + return -1; + } + + /** + return ((index >= 0) && + (monitorID >= 0) && + (bad_machines[monitorID] == GOOD_MACHINE) && + (max <= (total_missing_page[monitorID] + threshold))) ? + monitorID : index; + **/ +} + +void set_monitor(int mid) +{ + /* one machine is to be set. So need to succeed. */ + if (monitor_cmd(SELECT_MONITOR_CMD, mid)) { + if (verbose >=1) fprintf(stderr, "Monitor - %s\n", id2name(mid)); + return; + } else { + fprintf(stderr, "Fatal: monitor %s cannot be set up!\n", id2name(mid)); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void check_change_monitor(int undesired_index) +{ + /* this function changes the value of the global var: monitorID */ + int i, count; + char * flags; + + /* if all targets received the file, no need to go on */ + if ((count=nNotRecv())==0) return; + + if (count==1) { + i = iNotRecv(); + if (bad_machines[i] == GOOD_MACHINE) { + monitorID = i; + /* + 'i' could be the current monitor. + We'd like to set it because there might + be something wrong with it if we come to + to this point. + */ + set_monitor(monitorID); + } + return; + } + + /* more than two machines do not receive the file yet */ + /* flags mark those machines as BAD which we don't want to consider */ + flags = malloc(nTargets * sizeof(char)); + for(i=0; i=1) fprintf(stderr, "Monitor = %s\n", id2name(monitorID)); + break; + } else { + flags[monitorID] = BAD_MACHINE; + /* Then, we attemp to set up other machine */ + } + ++count; + } + + free(flags); + if (monitorID < 0) { + fprintf(stderr, "Fatal: monitor machine cannot be set up!\n"); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void do_one_page(int page) +{ + int resp; + unsigned long rtt; + refresh_timer(); + start_timer(); + if (!send_page(page)) return; + + /* first ignore all irrelevant resp */ + resp=read_handle_complaint(SENDING_DATA); + while (resp==0) { + resp=read_handle_complaint(SENDING_DATA); + } + + /* read_handle_complaint() waits n*interpage_interval at most */ + if (resp==-1) { + /* delay_sec for readable() is set by set_delay() */ + /****** + at this point, the readable() returns without getting a reply + from monitorID after FACTOR*DT_PERPAGE (or DT_PERPAGE if without_monitor) + ****/ + ++no_feedback_count; + if (verbose>=2) printf("no reply, count = %d\n", no_feedback_count); + update_rtt_hist(999999); + /* register this page as rtt = infinite --- the last element in rtt_hist */ + + if (no_feedback_count > NO_FEEDBACK_COUNT_MAX) { + /* switch to another client */ + if (verbose >=2) + fprintf(stderr, + "Consecutive non_feedback exceeds limit, Changing monitor machine.\n"); + /* if (nTargets>1 && (nTargets - nBadMachines()) > 1 && nNotRecv() > 1) */ + check_change_monitor(monitorID); /* replace the current monitor */ + no_feedback_count = 0; + } + return; + } else { /* resp == 1 */ + end_timer(); + update_time_accumulator(); + rtt = get_accumulated_usec(); + /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ + /* to do: update histogram */ + if (verbose >=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); + update_rtt_hist(rtt); + + no_feedback_count = 0; + } +} + +void send_cmd_and_wait_ack(int cmd_code) +{ + send_cmd(cmd_code, (int) ALL_MACHINES); + refresh_machine_status(); + /*set_delay(0, FAST);*/ + set_delay(0, DT_PERPAGE*FACTOR); + if (cmd_code==EOF_CMD) mod_machine_status(); + wait_for_ok(cmd_code); + do_badMachines_exit(); + /* check_change_monitor(-1); */ +} + +int do_file_changed_skip() +{ + /* if file is changed during syncing, then we should skip this file */ + if (file_changed || !same_stat_for_file()) { + fprintf(stderr, "WARNING: file is changed during sycing -- skipping\n"); + send_cmd_and_wait_ack(CLOSE_ABORT_CMD); + free_missing_page_flag(); + ++skip_count; + return TRUE; + } + return FALSE; +} + +int main(int argc, char *argv[]) +{ + int c; + int cfile, ctotal_pages, cpage; + char * source_path = NULL; + char * synclist_path = NULL; + char * machine_list_file = NULL; + time_t tloc; + time_t time0, time1, t_page0, t_page; + + while ((c = getopt(argc, argv, "v:w:A:P:T:LI:m:s:f:br:d:Xq")) != EOF) { + switch (c) { + case 'v': + verbose = atoi(optarg); + break; + case 'w': + my_ACK_WAIT_TIMES = atoi(optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'T': + my_TTL = atoi(optarg); + break; + case 'L': + my_LOOP = TRUE; + break; + case 'I': + my_IFname = optarg; + break; + case 'm': + machine_list_file = optarg; + break; + case 's': + source_path = optarg; + break; + case 'f': + synclist_path = optarg; + break; + case 'b': + backup = TRUE; /* if nPattern==0, backup means back up ALL files */ + break; + case 'r': /* to selectively back up certain files as defined in the pattern */ + if (!read_backup_pattern(optarg)) { + fprintf(stderr, "Failed in loading regex patterns in file = %s\n", optarg); + exit(BAD_EXIT); + } + break; + case 'd': + pattern_baseDir = strdup(optarg); + if (pattern_baseDir[strlen(pattern_baseDir)-1]=='/') + pattern_baseDir[strlen(pattern_baseDir)-1] = '\0' ; /* remove last / */ + break; + case 'X': + toRmFirst = TRUE; + break; + case 'q': + quitWithOneBad = TRUE; + break; + case '?': + usage(); + exit(BAD_EXIT); + } + } + + if (!machine_list_file || !source_path || !synclist_path ) { + fprintf(stderr, "Essential options (-m -s -f) should be specified. \n"); + usage(); + exit(BAD_EXIT); + } + + if (nPattern>0) backup = TRUE; + if (backup && nPattern>0) { + if (!pattern_baseDir) pattern_baseDir = strdup(source_path); + if (strlen(source_path) < strlen(pattern_baseDir) || + strncmp(source_path, pattern_baseDir, strlen(pattern_baseDir))!=0) { + fprintf(stderr, + "src_path (%s) should include (and be longer than) pattern_baseDir (%s)", + source_path, pattern_baseDir); + exit(BAD_EXIT); + } + } + + if (backup && toRmFirst) { + fprintf(stderr, "-B and -X cannot co-exist\n"); + exit(BAD_EXIT); + } + + get_machine_names(machine_list_file); + if (nTargets==0) { + fprintf(stderr, "No target to sync to\n"); + exit(GOOD_EXIT); + } + + if (!init_synclist(synclist_path, source_path)) exit(BAD_EXIT); + + if (total_entries()==0) { + fprintf(stderr, "Nothing to sync in %s\n", synclist_path); + exit(GOOD_EXIT); + } + + if (verbose >= 2) + fprintf(stderr, "Total number of files: %d\n", total_entries()); + + /* init the network stuff and some flags */ + init_sends(); + init_complaints(); + init_machine_status(nTargets); + + /* set up Cntl_C catcher */ + Signal(SIGINT, do_cntl_c); + + /* ------------------- set up monitor machine for doing feedback for each page sent */ + check_change_monitor(-1); + + /*-------------------------------------------------------------------------------*/ + + init_rtt_hist(); + time0 = time(&tloc); /* start time */ + t_page = 0; /* total time for sending pages */ + + /* -----------------------------Send the file one by one -----------------------------------*/ + for (cfile = 1; cfile <= total_entries(); cfile++) { /* for each file to be synced */ + if (!get_next_entry(cfile)) continue; + + ctotal_pages = pages_for_file(); + + /* + By the time this file, which was obtained when synclist was + established some time ago, may no longer exist on the master. + So, we need to check the existence of this file. + fexist() also opens the file so that it won't be deleted + between here and the send-page-loop. + */ + if (ctotal_pages > 0 && (!same_stat_for_file() || + !fexist(current_entry()))) { + /* go to next file if this file has changed or does not exist */ + fprintf(stderr, "%s (%d out of %d; Extinct file)\n", + getFilename(), current_entry(), total_entries()); + adjust_totals(); + continue; + } + + if (ctotal_pages < 0) { + fprintf(stderr, "%s (%d out of %d; to delete)\n", + getFilename(), current_entry(), total_entries()); + } else { + fprintf(stderr, "%s (%d out of %d; %d pages)\n", + getFilename(), current_entry(), total_entries(), ctotal_pages); + } + + /* send_open_cmd */ + pack_open_file_info(); + send_cmd_and_wait_ack(OPEN_FILE_CMD); + + /* + ctotal_pages < 0, for deletion + ctotal_pages = 0, regular file with no content. + or directory, softlink, hardlink + both should have been finished with OPEN_FILE_CMD + */ + if (ctotal_pages <= 0) continue; + + /* for other regular files */ + init_missing_page_flag(ctotal_pages); + refresh_missing_pages(); /* total missing pages for this file for each tar */ + + /* ----- sending file data ----- first round */ + t_page0 = time(&tloc); + no_feedback_count = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++) { + /* + the mode field and delay may be changed by change_monitor + */ + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(SENDING_DATA); + do_one_page(cpage); + } + + if (do_file_changed_skip()) continue; + + /* send "I am done with the first round" */ + reset_has_missing(); + refresh_file_received(); /* to record machines that have received this file */ + send_cmd_and_wait_ack(EOF_CMD); + + /* after the first run, before we go to 2nd and 3rd run, */ + if (has_missing_pages()) check_change_monitor(-1); + + /* ----- sending file data again, 2nd and 3rd and ...n-th round */ + reset_has_sick(); + while (has_missing_pages()) { + int c; /****************/ + no_feedback_count = 0; + + c = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++){ + if (is_it_missing(cpage-1)) { + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(RESENDING_DATA); + do_one_page(cpage); + page_sent(cpage-1); + ++c; /*************/ + } + } + if (verbose>=1) + fprintf(stderr, "re-sent N_pages = %d\n", c); /*************/ + + /* eof */ + reset_has_missing(); + send_cmd_and_wait_ack(EOF_CMD); + if (has_sick_machines()) { + break; + /* one machine can reach sick_state while some others are still + in missing_page state. + This break here is ok in terms of skipping this file.*/ + } else { + check_change_monitor(-1); + } + }; + + t_page += (time(&tloc) - t_page0);; + if (do_file_changed_skip()) continue; + + /* close file */ + send_cmd_and_wait_ack((has_sick_machines()) ? CLOSE_ABORT_CMD : CLOSE_FILE_CMD); + if (has_sick_machines()) { + fprintf(stderr, "Skip_syncing %s\n", getFilename()); + } + free_missing_page_flag(); + } /* end of the for each_file loop */ + + time1= time(&tloc); + return send_done_and_pr_msgs( ((double)(time1 - time0))/ 60.0, ((double)t_page)/60.0); +} + diff --git a/multicatcher.c b/multicatcher.c new file mode 100644 index 0000000..76fbc5e --- /dev/null +++ b/multicatcher.c @@ -0,0 +1,181 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + verision 3.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + -- catching slow machine as the feedback monitor + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64 bit arch by Nicolas Marot in France + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() + + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +extern int machineID; +extern int verbose; +extern int isMonitor; +extern char * baseDir; +extern char * cmd_name[]; +extern char * backup_suffix; + +char * my_MCAST_ADDR = MCAST_ADDR; +char * my_IFname = MCAST_IF; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; + +void usage() +{ + fprintf(stderr, + "multicatcher (to receive files from multicaster) version %s\n" + " Option list:\n" + " [ -v ]\n" + " -------- essential options ----------------------------------\n" + " -t \n" + " -i \n" + " -------- options for backup ---------------------------------\n" + " [ -u for backup files if -b is on in multicaster ]\n" + " -------- mcast options --------------------------------------\n" + " [ -A **same as for multicaster ]\n" + " [ -P **same as for multicaster ]\n" + " [ -I ]\n", + VERSION, MCAST_ADDR, PORT); +} + +int main(int argc, char *argv[]) +{ + int old_mode; /* hp: from char to int for mode */ + int mode; + int c; + + while ((c = getopt(argc, argv, "v:A:P:t:i:u:I:")) != EOF) { + switch (c) { + case 'v': + verbose = atoi(optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'I': + my_IFname = optarg; + break; + case 't': + baseDir = optarg; + break; + case 'i': + machineID = atoi(optarg); + break; + case 'u': + backup_suffix = strdup(optarg); + break; + case '?': + usage(); + exit(BAD_EXIT); + } + } + + if (machineID < 0 || !baseDir) { + fprintf(stderr, "Essential options (-t -i) should be specified. \n"); + usage(); + exit(BAD_EXIT); + } + + fprintf(stderr, "my_pid= %lu\n", getpid()); + + if (!backup_suffix) default_suffix(); + get_tmp_suffix(); /* get a unique tmp_name for the tmp file */ + + init_page_reader(); + init_complaint_sender(); + + /* initialize random numbers */ + srand(time(NULL) + getpid()); + + /* set the timeout for readable() to be about 3 to 6 seconds + Actually, this setting is arbitrary. + The timeout of readable() does not play a role in + the logic flow. + */ + set_delay( 3 + rand() % 6, 0); + mode = old_mode = TEST; + + /* -----------------------The main loop--------------------------- + Multicatcher simply waits for any incoming UDP, + reads and handles it. + If the UDP contains file content, it is placed in the right place. + If the UDP contains an instruction, it is carried out. + + Multicatcher never complains unless being told so. + For example, as it is now, multicatcher does not complain + about the rate of incoming UDP being too fast to handle. + If multicatcher cannot keep up with the speed, it just + loses certain pages in a file which will be reported + later when multicaster requests acknowledgement. + ---------------------------------------------------------------- */ + while(1) { /* loop for all incoming pages */ + if (verbose>=2) + fprintf(stderr, "Starting listen loop with mode %d, old_mode = %d\n", mode, old_mode); + + /* the major task here */ + mode = read_handle_page(); + if (verbose>=2) fprintf(stderr, "new page in mode %d\n", mode); + + if (mode == ALL_DONE_CMD) break; + + /* for debugging purpose */ + if ((old_mode != mode)) { + if (verbose>=2 && mode <= 5) fprintf(stderr, "%s\n", cmd_name[mode]); + } + + /* got no data? */ + if (mode == TIMED_OUT) { + if (verbose>=2) fprintf(stderr, "*"); + } + + old_mode = mode; + } /* end of incoming page loop */ + + if (verbose>=1) fprintf(stderr, "Done!\n"); + return 0; +} diff --git a/page_reader.c b/page_reader.c new file mode 100644 index 0000000..8f349a0 --- /dev/null +++ b/page_reader.c @@ -0,0 +1,426 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +/* the following is needed on Sun but not on linux */ +#ifdef _SUN +#include +#endif + +extern int machineID; +extern int verbose; +extern char * my_MCAST_ADDR; /* defined in multicatcher.c */ +extern char * my_IFname; +extern int my_PORT; +extern unsigned int current_file_id; + +int isMonitor; /* flag = if this target machine is a designated monitor */ +int nPage_recv; /* counter for the number of pages received for a file */ +int machineState; /* there are four states during one file transmission */ +int isFirstPage=TRUE; /* flag */ + +/* The followings are used to determine sick condition */ +int current_missing_pages; +int last_missing_pages; +int sick_count; + +/* receive socket */ +int recfd; +#ifndef IPV6 +struct sockaddr_in rec_addr; +#else +struct sockaddr_in6 rec_addr; +#endif + +/* + Receive buffer for storing the data obtained from UDP + The format: + (5*sizeof(int) bytes header) + (PAGE_SIZE data area) + + The header has five int_type (4 bytes) int's. + (1) mode -- for master to give instructions to the target machines. + (2) current file index (starting with 1) + (3) current page index (starting with 1) + (4) bytes that has been sent in this UDP page + (5) total number of pages. + + data_ptr points to the data area. +*/ +int *mode_ptr; /* hp: change from char to int */ +int *total_pages_ptr; +int *current_page_ptr; +int *bytes_sent_ptr, *current_file_ptr; +char *data_ptr; +char rec_buf[PAGE_BUFFSIZE]; + +void init_page_reader() +{ + /*struct ip_mreq mreq;*/ + int rcv_size; + + machineState = IDLE_STATE; + isMonitor = FALSE; + + /* Prepare buffer pointers */ + mode_ptr = (int*)rec_buf; + current_file_ptr = (int *)(mode_ptr + 1); + current_page_ptr = (int *)(current_file_ptr + 1); + bytes_sent_ptr = (int *)(current_page_ptr + 1); + total_pages_ptr = (int *)(bytes_sent_ptr + 1); + data_ptr = (char *)(total_pages_ptr + 1); + + /* Set up receive socket */ + if (verbose>=2) fprintf(stderr, "setting up receive socket\n"); + recfd = rec_socket(&rec_addr, my_PORT); + + /* Join the multicast group */ + /*inet_pton(AF_INET, MCAST_ADDR, &(mreq.imr_multiaddr.s_addr)); + mreq.imr_multiaddr.s_addr = inet_addr(my_MCAST_ADDR); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + + if (setsockopt(recfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void*)&mreq, sizeof(mreq)) < 0){ + perror("Joining Multicast Group"); + } + */ + + if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) { + perror("Joining Multicast Group"); + } + + /* Increase socket receive buffer */ + rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE; + if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer for page_reader"); + } + +} + +/* + This is the heart of multicatcher. + It parses the incoming pages and do proper reactions according + to the mode (command code) encoded in the first 4 bytes in an UDP page. + It returns the mode. +*/ +int read_handle_page() +{ + #ifndef IPV6 + struct sockaddr_in return_addr; + #else + struct sockaddr_in6 return_addr; + #endif + int bytes_read; + socklen_t return_len = (socklen_t)sizeof(return_addr); + int mode_v, bytes_sent_v, total_pages_v, current_file_v, current_page_v; + + /* -----------receiving data -----------------*/ + if (readable(recfd) == 1) { /* there is data coming in */ + /* check_queue(); This might be useful. But need more study. */ + /* get data */ + bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0, + (struct sockaddr *)&return_addr, + (socklen_t*) &return_len); + + bytes_sent_v = ntohl(*bytes_sent_ptr); + if (bytes_read != (bytes_sent_v + HEAD_SIZE)) + return NULL_CMD; + + /* convert from network byte order to host byte order */ + mode_v = ntohl(*mode_ptr); + total_pages_v = ntohl(*total_pages_ptr); + current_file_v = ntohl(*current_file_ptr); + current_page_v = ntohl(*current_page_ptr); + + if (isFirstPage) { + update_complaint_address(&return_addr); + isFirstPage = FALSE; + } + + /* --- process various commands (modes) */ + switch (mode_v) { + case TEST: + /* It is just a test packet? */ + fprintf(stderr, "********** Received test packet **********\n"); + return mode_v; + + case SELECT_MONITOR_CMD: + if (current_page_v == machineID) { + isMonitor = TRUE; + send_complaint(MONITOR_OK, machineID, 0, 0); + } else { + isMonitor = FALSE; + } + return mode_v; + + case OPEN_FILE_CMD: + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) + && machineState == IDLE_STATE) { + /* get info about this file */ + if (verbose>=1) + fprintf(stderr, "open file id= %d\n", current_file_v); + if (!extract_file_info(data_ptr, current_file_v, total_pages_v)) + return mode_v; + + /* different tasks here */ + /* open file, rmdir, unlink */ + if (total_pages_v < 0) { /* delete a file or a directory */ + if (!delete_file(TRUE)) return mode_v; /* this fx can be re-entered many times */ + /* machineState remains to be IDLE_STATE */ + } else if (total_pages_v == 0) { + /* handle an empty file, or (soft)link or directory */ + if (!check_zero_page_entry()) return mode_v; /* can re-enter many times */ + /* machineState remains to be IDLE_STATE */ + } else { /* a regular file */ + if (!open_file()) return mode_v; + /* the file has been opened. */ + sick_count = 0; + current_missing_pages =0; + last_missing_pages = nPages_for_file(current_file_v); + machineState = GET_DATA_STATE; + } + send_complaint(OPEN_OK, machineID, 0, current_file_id); /* ack */ + nPage_recv = 0; + return mode_v; + } + /* + We must be in GET_DATA_STATE. + OPEN_OK ack has been sent back in the previous block. + However, + the master may not have received the ack. + In that case the master will send back the open_file_cmd again. + */ + if ((current_page_v == (int) ALL_MACHINES ||current_page_v == (int) machineID) + && (machineState == GET_DATA_STATE)) { /****/ + send_complaint(OPEN_OK, machineID, 0, current_file_id); + } + return mode_v; + + case EOF_CMD: + /**********/ + if (verbose>=1) + fprintf(stderr, "***** EOF received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + /* the following happnens when this machine was previously out-of-pace + and was labeled as 'BAD MACHINE' by the master. + The master has proceeded with the syncing process without + waiting for this machine to finish the process in one of the previous files. + Since under normal condition, this machine should not expect to see + current_file changes except when OPEN_FILE_CMD is received. + */ + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + /* normal condition */ + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) + && machineState == GET_DATA_STATE) { /* GET_DATA_STATE */ + /* check missing pages and send back missing-page-request */ + current_missing_pages = ask_for_missing_page(); /* = total # of missing_pages */ + + if (current_page_v == (int) machineID) { + /* master is asking for my EOF_ack */ + if (current_missing_pages == 0) { + /* w/o assuming how we get to this point ... */ + machineState = DATA_READY_STATE; + send_complaint(EOF_OK, machineID, 0, current_file_id); + } else { + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages , current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + } + return mode_v; + } + + /*** + master is asking everyone, (after master sent or re-sent pages) + so, we do some book-keeping procedures (incl state change) + ***/ + if (verbose >=1) + fprintf(stderr, "missing_pages = %d, nPages_received = %d file = %d\n", + current_missing_pages, nPage_recv, current_file_v); /************/ + + nPage_recv = 0; + + if (current_missing_pages == 0) { + /* + There is no missing page. + Change the state. + */ + machineState = DATA_READY_STATE; + send_complaint(EOF_OK, machineID, 0, current_file_id); + return mode_v; + } else { + /* + There are missing pages. + If we still miss many pages for SICK_THRESHOLD consecutive times, + then we are sick. e.g. machine CPU does not give multicatcher + enough time to process incoming UDP's OR the disk I/O is too slow. + */ + + if ((SICK_RATIO)*(double)last_missing_pages < (double)current_missing_pages) { + ++sick_count; + if (sick_count > SICK_THRESHOLD) { + machineState = SICK_STATE; + send_complaint(SIT_OUT, machineID, + 0, current_file_id); /* no more attempt to receive */ + /* master may send more pages from requests from other machines + but this machine will mark this file as 'sits out receiving' */ + } else { + /* not sick enough yet */ + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages, current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + /* master will send more pages */ + } + } else { /* we are getting enough missing pages this time to keep up */ + sick_count = 0; /* break the consecutiveness */ + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages, current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + /* master will send more pages */ + } + last_missing_pages = current_missing_pages; + return mode_v; + } + } /* end GET_DATA_STATE */ + + /* After state change, we still get request for ack. + send back ack again */ + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)) { + switch (machineState) { + case DATA_READY_STATE: + send_complaint(EOF_OK, machineID, + 0, current_file_id); + return mode_v; + case SICK_STATE: + send_complaint(SIT_OUT, machineID, + 0, current_file_id); /* just an ack, even for sick state*/ + return mode_v; + } + } + return mode_v; + + case CLOSE_FILE_CMD: + if (verbose>=1) + fprintf(stderr, "***** CLOSE received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) { + if (machineState == DATA_READY_STATE) { + if (!close_file()) { return mode_v; }; + set_owner_perm_times(); + machineState = IDLE_STATE; + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + return mode_v; + } else if (machineState == IDLE_STATE) { + /* send ack back again because we are asked */ + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + return mode_v; + } else { /* other states -- we should not be here*/ + /* if (machineState == SICK_STATE || machineState == GET_DATA_STATE) */ + /* SICK_STATE --> we are too slow in getting missing pages + if one of the machines is sick, master will send out CLOSE_ABORT + GET_DATA_STATE --> + We are not supposed to be in GET_DATA_STATE, + so consider it a sick_state */ + fprintf(stderr, "*** should not be here -- state=%d\n", machineState); + if (!rm_tmp_file()) { return mode_v; }; + machineState = IDLE_STATE; + send_complaint(SIT_OUT, machineID, 0, current_file_id); + /* make sick_count larger than threshold for GET_DATA_STATE */ + sick_count = SICK_THRESHOLD + 10000; + return mode_v; + } + } + return mode_v; + + case CLOSE_ABORT_CMD: + if (verbose>=1) + fprintf(stderr, "***** CLOSE_ABORT received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) { + if (!rm_tmp_file()) { return mode_v; }; + machineState = IDLE_STATE; + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + } + return mode_v; + + case SENDING_DATA: + case RESENDING_DATA: + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + /* + otherwise, go ahead... + */ + if (verbose>=2) { + fprintf(stderr, "Got %d bytes from page %d of %d for file %d mode=%d\n", + bytes_read - HEAD_SIZE, + current_page_v, total_pages_v, + current_file_v + 1, mode_v); + } + + /* timing the disk IO */ + /* start_timer(); */ + + write_page(current_page_v, data_ptr, bytes_read - HEAD_SIZE); + if (isMonitor) send_complaint(PAGE_RECV, machineID, 0, current_file_id); + + /* + end_timer(); + update_time_accumulator(); + */ + + /* Yes, we have just read a page */ + ++nPage_recv; + return mode_v; + + case ALL_DONE_CMD: + /* + clear up the files. + */ + /*** since we do not know if there are other machines + that are NOT in data_ready_state, + to maintain equality, it is best to just + remove tmp_file without close_file() ***/ + rm_tmp_file(); + return mode_v; + + default: + return mode_v; + } /* end of switch */ + } else { + /* No, the read is timed out */ + return TIMED_OUT; + } +} + diff --git a/parse_synclist.c b/parse_synclist.c new file mode 100644 index 0000000..0c2a7aa --- /dev/null +++ b/parse_synclist.c @@ -0,0 +1,320 @@ +/* + Copyright (C) 2006-2008 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#include /* to define PATH_MAX */ + +/* + Get the next to-be-synced file from synclist which is the output of + parseRsyncList.C + The latter in turn parses the output from rsync. + In other words, we use rsync in dry-run mode to get the + files that need to be synced. + + to port to large_file environment: use off_t for size +*/ + +extern int verbose; + +char basedir[PATH_MAX]; /* baseDir for the syncing */ +FILE *fd; +struct stat st; /* stat for the current file */ + +unsigned int nEntries; +int cur_entry; /* file id -- <0 if it needs backup */ +unsigned int nPages; +unsigned int last_bytes; /* number of bytes for the last page */ +int toRmFirst = FALSE; /* flag rm existing file and then sync */ +int file_changed = FALSE; /* flag to indicate if file has been changed during syncing */ + +unsigned int total_pages; +off_t total_bytes; + +int isDelete, isHardlink; +char filename[PATH_MAX]; +char fullname[PATH_MAX]; +char linktar[PATH_MAX]; + +int init_synclist(char * synclist_path, char *bdir) +{ + char line[PATH_MAX]; + nEntries = 0; + strcpy(basedir, bdir); + + if ((fd = fopen(synclist_path, "r")) == NULL) { + fprintf(stderr, "Cannot open synclist file = %s \n", synclist_path); + return FAIL; + } + + while (fgets(line, PATH_MAX, fd) != NULL) nEntries++; + if (nEntries == 0) { + fclose(fd); + fprintf(stderr, "Empty entires in synclist file = %s\n", synclist_path); + return SUCCESS; /* OK, nothing to sync */ + } + rewind(fd); + + cur_entry = 0; + total_pages = 0; + total_bytes = 0; + + return SUCCESS; +} + +/* + pages_for_file calculates the number (int) of pages for the current file + and returns that number in an (int) type. + So, max_pages = 2**31 = 2147483648 + which corresponds to a file_size of 2**31 * 64512 = 1.38e14 + [ general limit = (1<<(sizeof(int)*8)) * PAGE_SIZE ] + At that time, the type of page_number needs to be upgraded :) + + to_delete -> -1 + normal_file -> number_of_pages + softlink -> 0 + hardlink -> 0 + directory -> 0 +*/ +int pages_for_file() +{ + if (isDelete) { /* to be deleted directory or file */ + return TO_DELETE; + } + + if (S_ISREG(st.st_mode)){ + int n; + if (st.st_nlink > 1) return 0; /* hardlink file */ + + n = (int)((st.st_size)/(off_t)PAGE_SIZE); /* regular file */ + if ((st.st_size)%((off_t)PAGE_SIZE) == 0) { + last_bytes = (unsigned int)(PAGE_SIZE); + return n; + } else { + last_bytes = (unsigned int)(st.st_size - (off_t)n * (off_t)PAGE_SIZE); + return n+1; + } + /*return ((st.st_size)%((off_t)PAGE_SIZE) == 0) ? n : n+1 ;*/ + } + if (S_ISLNK(st.st_mode)){ + return 0; /* softlink */ + } + return 0; /* directory */ +} + +off_t bytes_for_file() +{ + return st.st_size; +} + +unsigned int get_nPages() /* for this file */ +{ + return nPages; +} + +void strip(char * str) +{ + /* remove trailing \n and spaces */ + char *pc = &str[strlen(str)-1]; + while (*pc == ' ' || *pc == '\n') { + *(pc--) = '\0'; + } +} + +int same_stat_for_file() +{ + /* check if current stat is same as that when get_next_entry is called */ + struct stat st1; + + if(lstat(fullname, &st1) < 0) { + if (verbose >=1) perror(fullname); + return FAIL; + } + + if (st1.st_size != st.st_size || st1.st_mode != st.st_mode || + st1.st_mtime != st.st_mtime) { + return FAIL; /* the file has changed */ + } + return SUCCESS; + +} + +int is_hardlink_line(char * line) +{ + /* when line is in the form of + string1 string2 + it can be either a filename (string1 string2) + or a hardlink string1 => string2 + */ + struct stat st; + char fn[PATH_MAX]; + strcpy(fn, basedir); + strcat(fn, "/"); + strcat(fn, line); + + /* if the whole line is not a file, then we are dealing with hardlink case */ + return (lstat(fn, &st) < 0); + /* cannot deal with the situation + str1 is a file + str2 is a hardlink to str1 + str1 str2 is a file + AND if we need to sync + str1 and 'str1 str2' at the same time. + But this situation is very rare. */ +} + +int get_next_entry(int current_file_id) +{ + char *c; + char line[PATH_MAX]; + + /* inside this function, cur_entry is set to be positve to facilitate processing */ + cur_entry = current_file_id; /* from main loop's index, starting with 1 */ + + isDelete = FALSE; + isHardlink = FALSE; + + fgets(line, PATH_MAX, fd); + strip(line); + + if (current_file_id == nEntries) { + fclose(fd); /* close the synclist file */ + if (verbose>=2) fprintf(stderr, "no more entry in synclist.\n"); + } + + if (verbose>=2) { + fprintf(stderr, "Got current entry = %d (total= %d)\n", cur_entry, nEntries); + fprintf(stderr, "%s\n", line); + } + + strcpy(fullname, basedir); + strcat(fullname, "/"); + if (strncmp(line, "deleting ", 9)==0) { + isDelete = TRUE; + nPages = -1; + strcat(fullname, &line[9]); + strcpy(filename, &line[9]); + if (needBackup(fullname)) cur_entry = -cur_entry; + return SUCCESS; + } else if ((c = strchr(line, ' '))!=NULL && is_hardlink_line(line)) { + /* is it a hardlink -- two filenames separated by a space */ + char fn[PATH_MAX]; + isHardlink = TRUE; + strncpy(fn, line, (c - line)); + fn[c-line] = '\0'; + strcat(fullname, fn); + strcpy(filename, fn); + strcpy(linktar, c+1); + } else { + /* normal, single entry */ + strcat(fullname, line); + strcpy(filename, line); + } + + /* update stat */ + if(lstat(fullname, &st) < 0) { + if (verbose >=1) perror(fullname); + return FAIL; + } + + if (S_ISLNK(st.st_mode)) { + int linklen; + linklen = readlink(fullname, linktar, PATH_MAX); + /* readlink doesn't null-terminate the string */ + *(linktar + linklen) = '\0'; + } else if (st.st_nlink>1 && !isHardlink) { + /* this is the target file that others (hard)link to. + treat it like a normal file */ + st.st_nlink = 1; + } + + nPages = pages_for_file(); + if (nPages > 0) { /* for regular files */ + total_pages += nPages; + total_bytes += st.st_size; + } + + if (needBackup(fullname)) cur_entry = -cur_entry; + + file_changed = FALSE; + + return SUCCESS; +} + +void adjust_totals() +{ + if (nPages > 0) { + total_pages -= nPages; + total_bytes -= st.st_size; + } +} + +/* some accessors */ +unsigned int total_entries() { return nEntries; } + +int current_entry() { return cur_entry; } + +int is_softlink() { return S_ISLNK(st.st_mode); } + +int is_hardlink() { return isHardlink; } + +int is_directory() { return S_ISDIR(st.st_mode); } + +char * getFilename() { /* relative to basedir */ return &filename[0]; } + +char * getFullname() { return &fullname[0]; } + +/* + The following three fx are used to fill file_info into the send_buffer. + They return the number of bytes being written into the buf, including the \0 byte. +*/ +unsigned int fill_in_stat(char *buf) +{ + /* load into buf area the stat info in ascii format */ + if (isDelete) + sprintf(buf, "0 0 0 0 0 0 0 0"); + else { + #ifdef _LARGEFILE_SOURCE + sprintf(buf, "%u %u %u %u %llu %lu %lu %d", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, + toRmFirst); + #else + sprintf(buf, "%u %u %u %u %lu %lu %lu %d", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, + toRmFirst); + #endif + } + + return strlen(buf)+1; +} + +unsigned int fill_in_filename(char * buf) +{ + strcpy(buf, filename); + return strlen(buf)+1; +} + +unsigned int fill_in_linktar(char *buf) +{ + strcpy(buf, linktar); + return strlen(buf)+1; +} + + diff --git a/proto.h b/proto.h new file mode 100644 index 0000000..6569d44 --- /dev/null +++ b/proto.h @@ -0,0 +1,182 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_proto_h +#define __main_proto_h + +/* parse_synclist.c */ +unsigned int total_entries(); +unsigned int fill_in_stat(char *buf); +unsigned int fill_in_linktar(char *buf); +unsigned int fill_in_filename(char * buf); +unsigned int get_nPages(); +int pages_for_file(); +char * getFilename(); +char * getFullname(); +int same_stat_for_file(); +void strip(char * str); +int current_entry(); +int get_next_entry(int current_file_id); +int is_softlink(); +int is_directory(); +int is_hardlink(); +int init_synclist(char * synclist_path, char *bdir); +void adjust_totals(); + +/* backup.c */ +int read_backup_pattern(char * fpat_file); +int needBackup(char * filename); + +/* sends.c */ +void init_sends(); +void set_mode(int new_mode); +int send_page(int page); +void send_test(); +void send_cmd(int code, int machine_id); +void send_all_done_cmd(); +int fexist(int entry) ; +void pack_open_file_info(); +void my_exit(int); + +/* complaints.c */ +void init_complaints(); +int read_handle_complaint(int cmd); +void wait_for_ok(int code); +void refresh_machine_status(); +void refresh_missing_pages(); +void mod_machine_status(); +void refresh_file_received(); +int nNotRecv(); +int iNotRecv(); +int is_it_missing(int page); +int has_missing_pages(); +int has_sick_machines(); +void init_missing_page_flag(int n); +void free_missing_page_flag(); +void refresh_machine_status(); +void init_machine_status(int n); +void page_sent(int page); +int nBadMachines(); +void do_badMachines_exit(); +int pr_missing_pages(); +int send_done_and_pr_msgs(double, double); +void do_cntl_c(int signo); +void set_has_missing(); +void reset_has_missing(); +void set_has_sick(); +void reset_has_sick(); + + +/* setup_socket.c */ +void set_delay(int secs, int usecs); +void get_delay(int * secs, int * usecs); +int readable(int fd); +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port); +int send_socket(struct sockaddr_in *addr, char * cp, int port); +int rec_socket(struct sockaddr_in *addr, int port); +#else +int rec_socket(struct sockaddr_in6 *addr, int port); +int send_socket(struct sockaddr_in6 *addr, char * cp, int port); +int complaint_socket(struct sockaddr_in6 *addr, int port); +#endif + +/* set_mcast.c */ +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex); +int mcast_set_loop(int sockfd, int onoff); +int mcast_set_ttl(int sockfd, int val); + +/* set_catcher_mcast.c */ +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex); +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr); + +/* complaint_sender.c */ +void fill_in_int(int i); +void init_fill_ptr(); +void send_complaint(int complaint, int mid, int page, int file); +void init_complaint_sender(); +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa); +#else +void update_complaint_address(struct sockaddr_in6 *sa); +#endif + +/* page_reader.c */ +void init_page_reader(); +int check_queue(); +int read_handle_page(); + +/* file_operations.c */ +void get_tmp_suffix(); +int extract_file_info(char * buf, int n_file, unsigned int n_pages); +int open_file(); +int close_file(); +int rm_tmp_file(); +int delete_file(int to_check_dir_type); +int touch_file(); +int nPages_for_file(); +int has_all_pages(); +int ask_for_missing_page(); +void missing_page_stat(); +void write_page(int page, char* data_ptr, int bytes); +int is_missing(int page); +void page_received(int page); +int set_owner_perm_times(); +void close_last_file(); +int check_zero_page_entry(); +void default_suffix(); + +/* timing */ +void refresh_timer(); +double get_accumulated_time(); +void start_timer(); +void end_timer(); +void update_time_accumulator(); +double get_accumulated_usec(); +void update_rtt_hist(unsigned int rtt); +void pr_rtt_hist(); +void init_rtt_hist(); +unsigned int pages_wo_ack(); + +/* signal.c */ +typedef void Sigfunc(int); /* for signal handlers */ +Sigfunc * Signal(int signo, Sigfunc *func); +int Fcntl(int fd, int cmd, int arg); +int Ioctl(int fd, int request, void *arg); +void Sigemptyset(sigset_t *set); +void Sigaddset(sigset_t *set, int signo); +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset); + +/* id_map.c */ +void get_machine_names(char * filename); +char * id2name(int id); + +#endif diff --git a/rtt.c b/rtt.c new file mode 100644 index 0000000..71c8bd0 --- /dev/null +++ b/rtt.c @@ -0,0 +1,258 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +/* + To measure round trip time (RTT) using UDP +*/ + +#include "rttmain.h" +#include /* to define PATH_MAX */ +#include + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +int my_TTL = MCAST_TTL; +int my_LOOP = MCAST_LOOP; +char * my_IFname = MCAST_IF; + +int no_feedback_count; +char * machine = NULL; +int remote_pid; +char * reshell = REMOTE_SHELL; +char catcher_path[PATH_MAX]; + +void usage() +{ + fprintf(stderr, + "rtt (to measure rount trip time to a target version %s\n" + " Option list:\n" + " [ -v flag to turn on verbose]\n" + " [ -r ]\n" + " [ -p ]\n" + " -------- essential options ----------------------------------\n" + " -m \n" + " -n \n" + " -s \n" + " -------- mcast options --------------------------------------\n" + " [ -A ]\n" + " [ -P ]\n" + " [ -T ]\n" + " [ -L flag turn on mcast_LOOP. default is off ]\n" + " [ -I ]\n", + VERSION, reshell, catcher_path, my_MCAST_ADDR, my_PORT, my_TTL); +} + +void get_dirname_of_rtt(char *dname, char *rtt_path) +{ + char path[PATH_MAX]; + strcpy(path, rtt_path); /* dirname() will change its argument */ + strcpy(dname, dirname(path)); + + if (strcmp(dname, ".")==0) { + strcpy(dname, getcwd(path, PATH_MAX)); + } +} + +void do_one_page(int page) +{ + unsigned long rtt; + refresh_timer(); + start_timer(); + send_page(page); + /* read_handle_complaint() waits n*interpage_interval at most */ + if (read_handle_complaint()==0) { /* delay_sec for readable() is set by set_delay() */ + /* + At this point, the readable() returns without getting a reply + from the target after n*DT_PERPAGE + This indicates that the page has likely been lost in the network. + */ + if (verbose) fprintf(stderr, "no ack for page = %d\n", page); + ++no_feedback_count; + update_rtt_hist(999999); /* register this as rtt = infinite --- the last element in rtt_hist */ + if (no_feedback_count>NO_FEEDBACK_COUNT_MAX) { /* count the consecutive no_feedback event */ + /* switch to another client */ + fprintf(stderr, "Consecutive non_feedback exceeds limit. Continue with next page...\n"); + no_feedback_count = 0; + } + } else { + end_timer(); + update_time_accumulator(); + rtt = get_accumulated_usec(); + /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ + /* to do: update histogram */ + if (verbose>=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); + update_rtt_hist(rtt); + + no_feedback_count = 0; + } +} + +int invoke_catcher(char * machine) +{ + FILE *ptr; + char buf[PATH_MAX]; + + /* invoke rttcatcher on remote machine */ + fprintf(stderr, "using %s to invoke rttcatcher on %s\n", reshell, machine); + + /* check if rsh (ssh) works */ + sprintf(buf, "%s %s date", reshell, machine); + if (verbose) fprintf(stderr, "%s\n", buf); + if (system(buf)) { + fprintf(stderr, "cannot do rsh to the target machine = %s\n", machine); + exit(BAD_EXIT); + } + + if (!my_IFname) { + sprintf(buf, + "%s %s '%s/rttcatcher -A %s -P %d < /dev/null 1>/dev/null 2>/dev/null & echo $!'", + reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT); + } else { + sprintf(buf, + "%s %s '%s/rttcatcher -A %s -P %d -I %s < /dev/null 1>/dev/null 2>/dev/null & echo $!'", + reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT, my_IFname); + } + + + fprintf(stderr, "%s\n", buf); + if ((ptr = popen(buf, "r")) == NULL) { + fprintf(stderr, "Failure to invoke rttcather\n"); + exit(-1); + } + fgets(buf, PATH_MAX, ptr); + pclose(ptr); + + return atoi(buf); +} + +int main(int argc, char *argv[]) +{ + int c; + int nPages =-1, pageSize=-1, ipage; + + verbose = 0; + catcher_path[0] = '\0'; + + while ((c = getopt(argc, argv, "vm:s:n:r:p:A:P:T:LI:")) != EOF) { + switch (c) { + case 'v': + verbose = 1; + break; + case 'r': + reshell = optarg; + break; + case 'p': + strcpy(catcher_path, optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'T': + my_TTL = atoi(optarg); + break; + case 'L': + my_LOOP = TRUE; + break; + case 'I': + my_IFname = optarg; + break; + case 'n' : + nPages = atoi(optarg); + break; + case 'm': + machine = optarg; + break; + case 's': + pageSize = atoi(optarg); + if (pageSize>MAX_PAGE_SIZE) { + usage(); + exit(-1); + } + break; + case '?': + usage(); + exit(-1); + } + } + + if (strlen(catcher_path)==0) { + get_dirname_of_rtt(catcher_path, argv[0]); + } + + + if (nPages == -1 || pageSize == -1 || machine == NULL) { + fprintf(stderr, "Essential options (-n -m -s) should be specified. \n"); + usage(); + exit(-1); + } + + /* init */ + init_sends(pageSize); + init_complaints(); + + /* set up Cntl_C catcher */ + Signal(SIGINT, do_cntl_c); + + remote_pid = invoke_catcher(machine); + fprintf(stderr, "remote pid = %d\n", remote_pid); + + sleep(1); + + /* -------------------Send data-------------------------------------- */ + + init_missing_page_flag(nPages); + + send_cmd(START_CMD, nPages); + refresh_machine_status(); + wait_for_ok(START_CMD); + do_badMachines_exit(machine, remote_pid); + + fprintf(stderr, "Sending data...\n"); + /* send pages */ + set_mode(SENDING_DATA); + set_delay(0, DT_PERPAGE*FACTOR); + + no_feedback_count = 0; + for (ipage = 0; ipage < nPages; ipage++){ + do_one_page(ipage); + } + + send_cmd(EOF_CMD, 0); + refresh_machine_status(); + wait_for_ok(EOF_CMD); + do_badMachines_exit("", -1); + + /* -----------------end of send data -------------------------------- */ + + free_missing_page_flag(); + send_done_and_pr_msgs(); + return 0; +} + diff --git a/rttcatcher.c b/rttcatcher.c new file mode 100644 index 0000000..1af74f1 --- /dev/null +++ b/rttcatcher.c @@ -0,0 +1,118 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + Codes in this file are extracted and modified from multicatcher.c. + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +char * my_IFname = MCAST_IF; + +void usage() +{ + fprintf(stderr, + "rttcatcher (to receive pages on the target. version %s\n" + " Option list:\n" + " [ -v flag to turn on verbose]\n" + " -------- mcast options --------------------------------------\n" + " [ -A ]\n" + " [ -P ]\n" + " [ -I ]\n", + VERSION, MCAST_ADDR, PORT); +} + +int main(int argc, char *argv[]) +{ + int old_mode; /* hp: from char to int for mode */ + int mode; + int c; + + verbose = 0; + while ((c = getopt(argc, argv, "vA:P:I:")) != EOF) { + switch (c) { + case 'v': + verbose = 1; + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'I': + my_IFname = optarg; + break; + case '?': + usage(); + exit(-1); + } + } + + init_page_reader(); + init_complaint_sender(); + + /* initialize random numbers */ + srand(time(NULL) + getpid()); + + /* Wait forever if necessary for first packet */ + set_delay(0, -1); + mode = old_mode = TEST; /* hp: add mode */ + + while(1) { /* loop for all incoming pages */ + if (verbose) + fprintf(stderr, "Starting listen loop with mode %d\n", mode); + + mode = read_handle_page(); + if (verbose) fprintf(stderr, "in mode %d\n", mode); + + if (mode == ALL_DONE_CMD) break; + + /* got no data? */ + if (mode == TIMED_OUT) { + if (verbose) fprintf(stderr, "*"); + continue; + } /* end if TIMED_OUT */ + + /* changing modes? */ + if ((old_mode != SENDING_DATA) && (mode == SENDING_DATA)){ + /* Taking data, wait at least 3 to 8 seconds */ + set_delay( 3 + rand() % 8, 200); + if (verbose) fprintf(stderr, "Receiving data\n"); + old_mode = mode; + continue; + } + + /* all other modes */ + old_mode = mode; + + } /* end of incoming page loop */ + + if (verbose) fprintf(stderr, "Done!\n"); + return 0; +} + + diff --git a/rttcomplaint_sender.c b/rttcomplaint_sender.c new file mode 100644 index 0000000..20d7cda --- /dev/null +++ b/rttcomplaint_sender.c @@ -0,0 +1,103 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + Codes in this file are extracted and modified from complaint_sender.c + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +/* send socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; + +/* send buffer */ +char complaint_buffer[FLOW_BUFFSIZE]; +int *ccode_ptr; /* change from char to int -- mem alignment */ +int *cpage_ptr; + +/*---------------------------------------------------------- + init_complaint_sender initializes the buffer to allow the + catcher to send complaints back to the sender. + + ret_address of sender to whom we will complain + is determined when we receive the first UDP data + in read_handle_page() in page_reader.c + ----------------------------------------------------------*/ +void init_complaint_sender() +{ + if (verbose) + fprintf(stderr, "in init_complaint_sender\n"); + + /* init the send_socket */ + complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT); + + ccode_ptr = (int *) complaint_buffer; + cpage_ptr = (int *)(ccode_ptr + 1); +} + +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin_addr); +} +#else +void update_complaint_address(struct sockaddr_in6 *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin6_addr); +} +#endif + +/*------------------------------------------------------------------------ + send_complaint fills the complaint buffer and send it through our socket + back to the sender + + The major use is to tell master machine which page of which file + needs to be re-transmitted. + complaint -- the complain code defined in main.h + file -- the file index + page -- page index + ------------------------------------------------------------------------*/ +void send_complaint(int complaint, int page) +{ + /* fill in the complaint data */ + /* 20060323 add converting to network byte-order before sending out */ + *ccode_ptr = htonl(complaint); + *cpage_ptr = htonl(page); + + /* send it */ + if( sendto(complaint_fd, complaint_buffer, FLOW_BUFFSIZE, 0, + (const struct sockaddr *)&complaint_addr, + sizeof(complaint_addr)) < 0) { + perror("Sending complaint\n"); + } + if (verbose) + printf("Sent complaint:code=%d page=%d\n", complaint, page); +} diff --git a/rttcomplaints.c b/rttcomplaints.c new file mode 100644 index 0000000..4f9ed9b --- /dev/null +++ b/rttcomplaints.c @@ -0,0 +1,270 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + codes in this file are extracted and modified from complaints.c + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" +#include + +/* buffer for receiving complaints */ + +char flow_buff[FLOW_BUFFSIZE]; +int *code_ptr; /* What's wrong? */ +int *page_ptr; /* Which page */ + +/* receive socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; + +/* status */ +char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */ +int total_missing_page = 0; +char machine_status = NOT_READY; +int nMachines = 1; +int nPages; +char *machine_list_file; + +extern char * machine; +extern int remote_pid; +extern char * reshell; + +/* + init_complaints initializes our buffers to receive complaint information + from the catchers +*/ +void init_complaints () +{ + if (verbose) + fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE); + + /* Buffer */ + code_ptr = (int *)flow_buff; + page_ptr = (int *)(code_ptr + 1); + + /* Receive socket */ + if (verbose) printf("set up receive socket for complaints\n"); + complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT); +} + +void init_missing_page_flag(int n) +{ + int i; + nPages = n; + if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) { + fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n); + perror("error = "); + exit(-1); + } + for(i=0; i nPages) return 1; /* *page_ptr = page # (1 origin) */ + ++(total_missing_page); + missing_page_flag[(page_v)-1] = MISSING; + return 1; + case LAST_MISSING : + if (page_v > nPages) return 1; + ++(total_missing_page); + missing_page_flag[page_v-1] = MISSING; + machine_status = MACHINE_OK; + return 1; + default : + printf("Unknown complaint: %d\n", code_v); + return 0; + } /* end of switch */ + } /* end of if(readable) */ + + return 0; +} + +int all_machine_ok() +{ + return (machine_status == NOT_READY ) ? 0 : 1; +} + +void wait_for_ok(int code) +{ + int i, count; + time_t tloc; + time_t rtime0, rtime1; + + rtime0 = time(&tloc); /* reference time */ + + count = 0; + while (!all_machine_ok()) { + if (read_handle_complaint()==1) { /* if there is a complaint handled */ + rtime0 = time(&tloc); /* reset the reference time */ + continue; + } + /* no complaints handled */ + rtime1 = time(&tloc); /* time since last complaints */ + if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) { + ++count; + if (count < ACK_WAIT_TIMES) { + fprintf(stderr, "%d: resend cmd(%d) to machines:[ ", count, code); + for(i=0; i 0) { + kill_pid(); + } + + exit(-1); +} diff --git a/rttmain.h b/rttmain.h new file mode 100644 index 0000000..78eb693 --- /dev/null +++ b/rttmain.h @@ -0,0 +1,126 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_h +#define __main_h + +#include +#include +#include +#include +#include +#include +#include +#include /* sockaddr_in{} and other Internet defns */ +#include /* inet(3) functions */ +#include +#include /* for nonblocking */ +#include +#include +#include +#include +#include +#include /* for S_xxx file mode constants */ +#include /* for iovec{} and readv/writev */ +#include +#include +#include /* timeval{} for select() */ + +#define VERSION "3.1.0" + +/* logic values */ +#define FALSE 0 +#define TRUE 1 +#define FAIL (FALSE) +#define SUCCESS (TRUE) +#define GOOD_EXIT 0 +#define BAD_EXIT -1 + +/* Ports and addresses */ +#define PORT 7900 /* for multicast */ +#define FLOW_PORT (PORT-1) /* for flow-control */ +#define MCAST_ADDR "239.255.67.200" +#define MCAST_TTL 1 +#define MCAST_LOOP FALSE +#define MCAST_IF NULL + +#define REMOTE_SHELL "rsh" + +#define NO_FEEDBACK_COUNT_MAX 5 +#define USEC_TO_IDLE 1000000 + +/* Speed stuff */ +#define FAST 100 /* usec */ +#define DT_PERPAGE 8000 /* usec time interval between pages */ +#define FACTOR 50 + +/* time for the master to wait for the acknowledgement */ +#define ACK_WAIT_PERIOD 1 /* secs (from time()); */ +#define ACK_WAIT_TIMES 60 /* wait for this many periods */ + +/* complaints */ +#define TOO_FAST 100 +#define SEND_AGAIN 200 +#define START_OK 300 +#define MISSING_PAGE 500 +#define LAST_MISSING 600 +#define EOF_OK 700 +#define PAGE_RECV 800 + +#define FLOW_BUFFSIZE (2 * sizeof(int)) + +#define PAGE_SIZE 64512 /* max page_size allowed */ +#define HEAD_SIZE (3 * sizeof(int)) +#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE) +#define TOTAL_REC_PAGE 20 /* 31 20 */ + +/* Modes */ +#define TIMED_OUT 0 +#define TEST 1 +#define SENDING_DATA 2 +#define RESENDING_DATA 3 +#define START_CMD 4 +#define EOF_CMD 5 +#define ALL_DONE_CMD 6 +#define NULL_CMD 7 + +/* machine status */ +#define MACHINE_OK '\1' +#define NOT_READY '\0' + +/* PAGE STATUS */ +#define MISSING '\0' +#define RECEIVED '\1' + +/* MACHINE STATE */ +#define IDLE_STATE 0 +#define GET_DATA_STATE 1 +#define DATA_READY_STATE 2 + +#define MAX_PAGE_SIZE 64512 + +int verbose; + +#include "rttproto.h" + +#endif diff --git a/rttmissings.c b/rttmissings.c new file mode 100644 index 0000000..774e8b2 --- /dev/null +++ b/rttmissings.c @@ -0,0 +1,93 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +char * missingPages = NULL; /* array of flags */ +int nPages; + +int init_missingPages(int n) +{ + int i; + + nPages = n; + + /* init missingPages flags */ + if (missingPages != NULL) free(missingPages); /* for second round */ + missingPages = malloc(sizeof(char) * nPages); + for(i=0; i < nPages; ++i) + missingPages[i] = MISSING; + + return 0; +} + +int get_total_pages() +{ + return nPages; +} + +int missing_pages() +{ + int result; + int i; + + result = 0; + + for(i=0; i < nPages; ++i) + if ((missingPages[i]) == MISSING) ++result; + return result; +} + +int is_missing(int page) +{ + return (missingPages[page] == MISSING) ? 1 : 0; +} + +void page_received(int page) +{ + missingPages[page] = RECEIVED; +} + +int ask_for_missing_page() +{ + int i; + int n, count; + + n = missing_pages(); + if (n == 0) { + /* send_complaint(EOF_OK, machineID, 0); */ + return 0; /* nothing is missing */ + } + + count = 0; + for(i=0; i < nPages; ++i) { + if ( missingPages[i] == MISSING ) { + ++count; + send_complaint((count==n) ? LAST_MISSING : MISSING_PAGE, i+1); + usleep(DT_PERPAGE); + } + } + + return 1; /* there is something missing */ +} + diff --git a/rttpage_reader.c b/rttpage_reader.c new file mode 100644 index 0000000..35940e5 --- /dev/null +++ b/rttpage_reader.c @@ -0,0 +1,188 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + Codes in this file are extraced and modified from page_reader.c + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +/* the following is needed on Sun but not on linux */ +#ifdef _SUN +#include +#endif + +extern char * my_MCAST_ADDR; /* defined in rttcatcher.c */ +extern int my_PORT; +extern char * my_IFname; +int machineState; +int isFirstPage = TRUE; + +int recfd; +#ifndef IPV6 +struct sockaddr_in rec_addr; +#else +struct sockaddr_in6 rec_addr; +#endif + +int *mode_ptr; /* change from char to int */ +int *total_bytes_ptr; +int *current_page_ptr; +char *data_ptr; +char rec_buf[PAGE_BUFFSIZE]; + +void init_page_reader() +{ + struct ip_mreq mreq; + int rcv_size; + + machineState = IDLE_STATE; + + /* Prepare buffer */ + mode_ptr = (int*)rec_buf; /* hp: add the cast (int *) */ + current_page_ptr = (int *)(mode_ptr + 1); + total_bytes_ptr = (int *)(current_page_ptr + 1); + data_ptr = (char *)(total_bytes_ptr + 1); + + /* Set up receive socket */ + if (verbose) fprintf(stderr, "setting up receive socket\n"); + recfd = rec_socket(&rec_addr, my_PORT); + + /* Join the multicast group */ + if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) { + perror("Joining Multicast Group"); + } + + /* Increase socket receive buffer */ + rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE; + if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer"); + } +} + + +/* + This is the heart of catcher. + It parses the incoming pages and do proper reactions according + to the mode (command code) encoded in the first 4 bytes in an UDP page. + It returns the mode. + + Note: since rtt is intended to deal with one-to-one machine, + the four-state engine as in page_reader is not used. +*/ +int read_handle_page() +{ + #ifndef IPV6 + struct sockaddr_in return_addr; + #else + struct sockaddr_in6 return_addr; + #endif + + int bytes_read; + socklen_t return_len = (socklen_t)sizeof(return_addr); + int mode_v, total_bytes_v, current_page_v; + + /* -----------receiving data -----------------*/ + if (readable(recfd) == 1) { /* there is data coming in */ + /* get data */ + bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0, + (struct sockaddr *)&return_addr, + (socklen_t*) &return_len); + + total_bytes_v = ntohl(*total_bytes_ptr); + if (bytes_read != total_bytes_v) return NULL_CMD; + + /* convert from network byte order to host byte order */ + mode_v = ntohl(*mode_ptr); + current_page_v = ntohl(*current_page_ptr); + + if (isFirstPage) { + update_complaint_address(&return_addr); + isFirstPage = FALSE; + } + + /* get init wish list and return address first time only + if (firstTime){ + if (verbose) + fprintf(stderr, "Initializing complaint_sender and wish_list\n"); + init_complaint_sender(&return_addr); + firstTime = 0; + } + */ + + /* --- process various commands */ + switch (mode_v) { + case TEST: + /* It is just a test packet? */ + fprintf(stderr, "********** Received test packet **********\n"); + return mode_v; + + case START_CMD: + if (verbose) + fprintf(stderr, "Start cmd received ---\n"); + init_missingPages(current_page_v); /* Here: use current_page for total_pages */ + send_complaint(START_OK, 0); + return mode_v; + + case EOF_CMD: + if (verbose) + fprintf(stderr, "Check and ask for missing pages ---\n"); + + if (ask_for_missing_page()==0) { + /* + There is no missing page. + */ + send_complaint(EOF_OK, 0); + } + /* + else + There are missing pages. + Ack has been done in ask_for_missing_page() + */ + + return mode_v; + + case SENDING_DATA: + case RESENDING_DATA: + if (verbose){ + fprintf(stderr, "Got %d bytes from page %d of %d, mode=%d\n", + bytes_read - HEAD_SIZE, + current_page_v, get_total_pages(), mode_v); + } + + /* mode == SENDING_DATA, RESENDING_DATA */ + page_received(current_page_v); + send_complaint(PAGE_RECV, 0); + + /* Yes, we read a page */ + return mode_v; + case ALL_DONE_CMD: /* this is presumably a good machine */ + default: + return mode_v; + } /* end of switch */ + } else { + /* No, the read timed out */ + return TIMED_OUT; + } +} + + diff --git a/rttproto.h b/rttproto.h new file mode 100644 index 0000000..6d4cf62 --- /dev/null +++ b/rttproto.h @@ -0,0 +1,116 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __rttproto_h +#define __rttproto_h + +/* rttsends */ +void init_sends(int n); +void set_mode(int new_mode); +int send_page(int page); +void send_cmd(int code, int machine_id); +void send_all_done_cmd(); + +/* rttcomplaints */ +void init_complaints(); +int read_handle_complaint(); +void wait_for_ok(int code); +void refresh_machine_status(); +int is_it_missing(int page); +int has_missing_pages(); +void init_missing_page_flag(int n); +void free_missing_page_flag(); +void refresh_machine_status(); +int get_total_missing_pages(); +void page_sent(int page); +void pr_missing_pages(); +void do_cntl_c(int signo); +void send_done_and_pr_msgs(); +void do_badMachines_exit(char * machine, int pid); + +/* setup_socket.c */ +void set_delay(int secs, int usecs); +int readable(int fd); +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port); +int send_socket(struct sockaddr_in *addr, char * cp, int port); +int rec_socket(struct sockaddr_in *addr, int port); +#else +int rec_socket(struct sockaddr_in6 *addr, int port); +int send_socket(struct sockaddr_in6 *addr, char * cp, int port); +int complaint_socket(struct sockaddr_in6 *addr, int port); +#endif + +/* set_mcast.c */ +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex); +int mcast_set_loop(int sockfd, int onoff); +int mcast_set_ttl(int sockfd, int val); + +/* set_catcher_mcast.c */ +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex); +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr); + +/* rttcomplaint_sender */ +void send_complaint(int complaint, int page); +void init_complaint_sender(); +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa); +#else +void update_complaint_address(struct sockaddr_in6 *sa); +#endif + +/* rttpage_reader */ +void init_page_reader(); +int read_handle_page(); + +/* rttmissings */ +int init_missingPages(int n); +int missing_pages(); +int is_missing(int page); +void page_received(int page); +int ask_for_missing_page(); +int get_total_pages(); + +/* timing */ +void refresh_timer(); +double get_accumulated_time(); +void start_timer(); +void end_timer(); +void update_time_accumulator(); +double get_accumulated_usec(); +void update_rtt_hist(unsigned int rtt); +void pr_rtt_hist(); +void init_rtt_hist(); + +/* signal.c */ +typedef void Sigfunc(int); /* for signal handlers */ +Sigfunc * Signal(int signo, Sigfunc *func); +int Fcntl(int fd, int cmd, int arg); +int Ioctl(int fd, int request, void *arg); +void Sigemptyset(sigset_t *set); +void Sigaddset(sigset_t *set, int signo); +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset); + +#endif diff --git a/rttsends.c b/rttsends.c new file mode 100644 index 0000000..7038e2c --- /dev/null +++ b/rttsends.c @@ -0,0 +1,144 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + codes in this file are extracted and modified from sends.c + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" +#include +#ifdef _SUN +#include /* define SIOCGIFADDR */ +#else +#include +#endif + +extern char * my_MCAST_ADDR; +extern int my_PORT; +extern int my_TTL; +extern int my_LOOP; +extern char * my_IFname; + +/* buffer for sending (same structure as those in sends.c */ +int *mode_ptr; /* char type would cause alignment problem on Sparc */ +int *current_page_ptr; +int *total_bytes_ptr; +char *data_ptr; +char send_buff[PAGE_BUFFSIZE]; + +int pageSize; + +/* Send socket */ +int send_fd; +#ifndef IPV6 +struct sockaddr_in send_addr; +#else +struct sockaddr_in6 send_addr; +#endif + +/* + set_mode sets the caster into a new mode. + modes are defined in main.h: +*/ +void set_mode(int new_mode) +{ + *mode_ptr = htonl(new_mode); +} + + +/* init_sends initializes the send buffer */ +void init_sends(int npagesize) +{ + pageSize = (npagesize>PAGE_SIZE) ? PAGE_SIZE : npagesize; + + mode_ptr = (int *)send_buff; /* hp: add (int*) */ + current_page_ptr = (int *) (mode_ptr + 1); + total_bytes_ptr = (int *)(current_page_ptr + 1); + data_ptr = (char *)(total_bytes_ptr + 1); + + send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT); + + /******* change MULTICAST_IF ********/ + if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0) + perror("init_sends(): when set MULTICAST_IF\n"); + + /* set multicast_ttl such that UDP can go to 2nd subnetwork */ + if (mcast_set_ttl(send_fd, my_TTL) < 0) + perror("init_sends(): when set MULTICAST_TTL\n"); + + /* disable multicast_loop such that there is no echo back on master */ + if (mcast_set_loop(send_fd, my_LOOP) < 0) + perror("init_sends(): when set MULTICAST_LOOP\n"); + + /* put dummy contents into send (UDP) buffer */ + memset(data_ptr, 1, PAGE_SIZE); +} + +/* + send_buffer will send the buffer with the file information + out to the socket connection with the catcher. +*/ +int send_buffer(int bytes_read) +{ + /* Else send the data */ + if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE, + 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) { + perror("Sending packet"); + exit(1); + } + return (1); +} + +/* + send_page takes a page from the current file and controls + sending it out the socket to the catcher. It calls send_buffer + to do the actuall call to sendto. +*/ +int send_page(int page) +{ + if (verbose>=2) fprintf(stderr, "in send_page\n"); + *total_bytes_ptr = htonl(pageSize+HEAD_SIZE); + *current_page_ptr = htonl(page); + + return send_buffer(pageSize); +} + + +void send_cmd(int code, int pages) +{ + *mode_ptr = htonl(code); + *current_page_ptr = htonl(pages); + *total_bytes_ptr = htonl(HEAD_SIZE); + + send_buffer(0); +} + + +void send_all_done_cmd() +{ + *mode_ptr = htonl(ALL_DONE_CMD); + *current_page_ptr = 0; + *total_bytes_ptr = htonl(HEAD_SIZE) ; + + send_buffer(0); + if (verbose) fprintf(stderr, "(ALL DONE)\n"); +} diff --git a/sends.c b/sends.c new file mode 100644 index 0000000..61aa07a --- /dev/null +++ b/sends.c @@ -0,0 +1,329 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + Following the suggestion by Robert Dack , + I added the option to change the default IP address for multicasting + and the PORT for flow control. See mrsync.py for the new options. + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#ifdef _SUN +#include /* define SIOCGIFADDR */ +#else +#include +#endif + +extern char * my_MCAST_ADDR; /* defined in multicaster.c */ +extern int my_PORT; /* ditto */ +extern int my_TTL; +extern int my_LOOP; +extern char * my_IFname; /* defined in multicaster.c */ +extern int verbose; + +extern unsigned int nPages; +extern unsigned int last_bytes; /* the number of bytes in the last page of a file */ +extern int cur_entry; +extern int file_changed; +extern char* cmd_name[]; + +/* Where have we last sent from? */ +int most_recent_file; +int most_recent_fd; + +/* + Send buffer for storing the data and to be transmitted thru UDP + The format: + (5*sizeof(int) bytes header) + (PAGE_SIZE data area) + + The header has five int_type (4 bytes) int's. + (1) mode -- for master to give instructions to the target machines. + (2) current file index (starting with 1) + (3) current page index (starting with 1) -- gee! + (4) bytes to be sent in this page via UPD + (5) total number of pages for this file + + data_ptr points to the data area. +*/ +int *mode_ptr; /* char type would cause alignment problem on Sparc */ +int *current_file_ptr; +int *current_page_ptr; +int *bytes_sent_ptr; +int *total_pages_ptr; +char *data_ptr; +char *fill_here; +char send_buff[PAGE_BUFFSIZE]; + +/* Send socket */ +int send_fd; +#ifndef IPV6 +struct sockaddr_in send_addr; +#else +struct sockaddr_in6 send_addr; +#endif + +/* for final statistics */ +extern unsigned int total_pages; +extern off_t total_bytes; +off_t real_total_bytes; +unsigned int real_total_pages; + +/* + set_mode sets the caster into a new mode. + modes are defined in main.h: +*/ +void set_mode(int new_mode) +{ + /* 20060323 convert it to network byte order */ + *mode_ptr = htonl(new_mode); +} + +/* init_sends initializes the send buffer */ +void init_sends() +{ + most_recent_file = most_recent_fd = -99999; + real_total_bytes = 0; + real_total_pages = 0; + + /* pointers for send buffer */ + mode_ptr = (int *)send_buff; + current_file_ptr = (int *)(mode_ptr+1); + current_page_ptr = (int *)(current_file_ptr + 1); + bytes_sent_ptr = (int *)(current_page_ptr + 1); + total_pages_ptr = (int *)(bytes_sent_ptr + 1); + data_ptr = (char *)(total_pages_ptr + 1); + fill_here = data_ptr; + + /* send socket */ + send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT); + + /******* change MULTICAST_IF ********/ + if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0) + perror("init_sends(): when set MULTICAST_IF\n"); + + /* set multicast_ttl such that UDP can go to 2nd subnetwork */ + if (mcast_set_ttl(send_fd, my_TTL) < 0) + perror("init_sends(): when set MULTICAST_TTL\n"); + + /* disable multicast_loop such that there is no echo back on master */ + if (mcast_set_loop(send_fd, my_LOOP) < 0) + perror("init_sends(): when set MULTICAST_LOOP\n"); +} + +void clear_send_buf() +{ + fill_here = data_ptr; +} + +/* + put file contents into send (UDP) buffer + return the number of bytes put into the buffer. +*/ +ssize_t pack_page_for_file(int page) +{ + if(verbose>=2) + fprintf(stderr, "Sending page %d of file %d\n", page, current_entry()); + + /*** + Adjust the position for reading + NOTE:if the type (off_t) is not given, the large file operation + would fail. + ***/ + lseek(most_recent_fd, (off_t)PAGE_SIZE * (off_t)(page - 1), SEEK_SET); + + /* read it and put the content into send_buff */ + /* the max number this read() will return is PAGE_SIZE */ + return read(most_recent_fd, data_ptr, PAGE_SIZE); +} + +int fexist(int entry) +{ + if (entry != most_recent_file) { + if (most_recent_fd > 0) close(most_recent_fd); /* make sure we close it */ + + if((most_recent_fd = open(getFullname(), O_RDONLY, 0)) < 0){ + perror(getFullname()); + } + most_recent_file = entry; + } + return (most_recent_fd >= 0); /* <0 means FAIL */ +} + + +/* + send_buffer will send the buffer with the file information + out to the socket connection with the catcher. + return 0 -- ok, -1 sent failed. +*/ +int send_buffer(int bytes_read) +{ + /* send the data */ + if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE, + 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) { + perror("Sending packet"); + return FAIL; + } + return SUCCESS; +} + +/* + send_page takes a page from the current file and controls + sending it out the socket to the catcher. It calls send_buffer + to do the actuall call to sendto. +*/ +int send_page(int page) +{ + unsigned int bytes; + + if (verbose>=2) fprintf(stderr, "In send_page()\n"); + + if (file_changed) { + total_bytes -= ((page=3) + fprintf(stderr, "Sending page=%d of %d in file %d of %d\n", + page, nPages, + cur_entry, total_entries()); + return send_buffer(bytes); +} + +/* + send_test zeroes out the buffers going to the catcher + and thereby sends a test packet to the catcher. +*/ +void send_test() +{ + *mode_ptr = htonl(TEST); /* --> network byte order */ + + *current_file_ptr = 0; + *current_page_ptr = 0; + *total_pages_ptr = 0; + *bytes_sent_ptr = 0; + + send_buffer(0); + fprintf(stderr, "Test packet is sent.\n"); +} + +void send_cmd(int code, int machine_id) +{ + /* + Except for OPEN_FILE_CMD, + only the header area in the send_buff gets filled + with data. + */ + *mode_ptr = htonl(code); /* --> network byte order */ + *current_page_ptr = htonl(machine_id); /* -1 being all machines */ + *current_file_ptr = htonl(cur_entry); + *total_pages_ptr = htonl(nPages); + + *bytes_sent_ptr = (code == OPEN_FILE_CMD) ? htonl(fill_here - data_ptr) :0; + + /* do the header in send_buf */ + send_buffer((code == OPEN_FILE_CMD) ? fill_here - data_ptr : 0); + + /* print message */ + if (verbose>=2) { + fprintf(stderr, "cmd [%s] sent\n", cmd_name[code]); + } +} + +void pack_open_file_info() +{ + /* prepare udp send_buffer for file info + (header) (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0 + */ + clear_send_buf(); + fill_here += fill_in_stat(data_ptr); + fill_here += fill_in_filename(fill_here); + if (is_softlink() || is_hardlink()) { + fill_here += fill_in_linktar(fill_here); + } +} + +void send_all_done_cmd() +{ + int i; + *mode_ptr = htonl(ALL_DONE_CMD); /* --> network byte order */ + + *current_file_ptr = 0; + *current_page_ptr = 0; + *total_pages_ptr = 0; + *bytes_sent_ptr = 0; + + for(i=0; i<10; ++i) { /* do it many times, in case network is busy */ + send_buffer(0); + usleep(DT_PERPAGE*10); + } + /* NOTE: it is still possible that ALL_DONE msg could not + be received by targets. + For total robustness, some independent checking on targets + should be done. + */ + fprintf(stderr, "(ALL DONE)\n"); +} + +void my_exit(int good_or_bad) +{ + if (send_fd) send_all_done_cmd(); + exit(good_or_bad); +} diff --git a/set_catcher_mcast.c b/set_catcher_mcast.c new file mode 100644 index 0000000..9a04017 --- /dev/null +++ b/set_catcher_mcast.c @@ -0,0 +1,142 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + + This file collects functions related to setting multicast + for multicatcher. They are IPv4 and IPv6 ready. + By default, we use IPv4. + To use IPv6, we need to specify -DIPv6 in Makefile. + The functions in this file are collected from + Richard Stevens' Networking bible: Unix Network programming + + I added Mcast_join(). + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#ifdef _SUN +#include /* define SIOCGIFADDR */ +#else +#include +#endif + +#define SA struct sockaddr + +int mcast_join(int sockfd, const SA *sa, socklen_t salen, + const char *ifname, u_int ifindex) +{ + switch (sa->sa_family) { + case AF_INET: { + struct ip_mreq mreq; + struct ifreq ifreq; + + memcpy(&mreq.imr_multiaddr, + &((struct sockaddr_in *) sa)->sin_addr, + sizeof(struct in_addr)); + + if (ifindex > 0) { + if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) { + errno = ENXIO; /* i/f index not found */ + return(-1); + } + goto doioctl; + } else if (ifname != NULL) { + strncpy(ifreq.ifr_name, ifname, IFNAMSIZ); +doioctl: + if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0) + return(-1); + memcpy(&mreq.imr_interface, + &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr, + sizeof(struct in_addr)); + } else + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + + return(setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreq, sizeof(mreq))); + } + +#ifdef IPV6 + case AF_INET6: { + struct ipv6_mreq mreq6; + + memcpy(&mreq6.ipv6mr_multiaddr, + &((struct sockaddr_in6 *) sa)->sin6_addr, + sizeof(struct in6_addr)); + + if (ifindex > 0) + mreq6.ipv6mr_interface = ifindex; + else if (ifname != NULL) + if ( (mreq6.ipv6mr_interface = if_nametoindex(ifname)) == 0) { + errno = ENXIO; /* i/f name not found */ + return(-1); + } + else + mreq6.ipv6mr_interface = 0; + + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, + &mreq6, sizeof(mreq6))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex) +{ + #ifndef IPV6 + /* IPv4 */ + struct sockaddr_in sa; + sa.sin_family = AF_INET; + inet_pton(AF_INET, mcast_addr, &sa.sin_addr); + #else + struct sockaddr_in6 sa; + sa.sin6_family = AF_INET6; + inet_pton(AF_INET6, mcast_addr, &sa.sin6_addr); + #endif + + return (mcast_join(sockfd, (struct sockaddr *) &sa, sizeof(sa), + ifname, ifindex)); +} + +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr) +{ + switch (sa->sa_family) { + case AF_INET: { + struct sockaddr_in *sin = (struct sockaddr_in *) sa; + + memcpy(&sin->sin_addr, addr, sizeof(struct in_addr)); + return; + } + + #ifdef IPV6 + case AF_INET6: { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) sa; + + memcpy(&sin6->sin6_addr, addr, sizeof(struct in6_addr)); + return; + } + #endif + } + + return; +} + diff --git a/set_mcast.c b/set_mcast.c new file mode 100644 index 0000000..aac3d36 --- /dev/null +++ b/set_mcast.c @@ -0,0 +1,160 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + + This file collects functions related to setting multicast + for multicaster. They are IPv4 and IPv6 ready. + By default, we use IPv4. + To use IPv6, we need to specify -DIPv6 in Makefile. + The functions in this file are collected from + Richard Stevens' Networking bible: Unix Network programming + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include +#ifdef _SUN +#include /* define SIOCGIFADDR */ +#else +#include +#endif + +#define SA struct sockaddr +#define MAXSOCKADDR 128 /* max socket address structure size */ + +int sockfd_to_family(int sockfd) +{ + union { + struct sockaddr sa; + char data[MAXSOCKADDR]; + } un; + socklen_t len; + + len = MAXSOCKADDR; + if (getsockname(sockfd, (SA *) un.data, &len) < 0) + return(-1); + return(un.sa.sa_family); +} + +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + struct in_addr inaddr; + struct ifreq ifreq; + + if (ifindex > 0) { + if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) { + errno = ENXIO; /* i/f index not found */ + return(-1); + } + goto doioctl; + } else if (ifname != NULL) { + strncpy(ifreq.ifr_name, ifname, IFNAMSIZ); +doioctl: + if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0) + return(-1); + memcpy(&inaddr, + &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr, + sizeof(struct in_addr)); + } else + inaddr.s_addr = htonl(INADDR_ANY); /* remove prev. set default */ + + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF, + &inaddr, sizeof(struct in_addr))); + } + +#ifdef IPV6 + case AF_INET6: { + u_int index; + + if ( (index = ifindex) == 0) { + if (ifname == NULL) { + errno = EINVAL; /* must supply either index or name */ + return(-1); + } + if ( (index = if_nametoindex(ifname)) == 0) { + errno = ENXIO; /* i/f name not found */ + return(-1); + } + } + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_IF, + &index, sizeof(index))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + +int mcast_set_loop(int sockfd, int onoff) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + u_char flag; + + flag = onoff; + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, + &flag, sizeof(flag))); + } + +#ifdef IPV6 + case AF_INET6: { + u_int flag; + + flag = onoff; + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, + &flag, sizeof(flag))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} +/* end mcast_set_loop */ + +int mcast_set_ttl(int sockfd, int val) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + u_char ttl; + + ttl = val; + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, + &ttl, sizeof(ttl))); + } + +#ifdef IPV6 + case AF_INET6: { + int hop; + + hop = val; + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, + &hop, sizeof(hop))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + diff --git a/setup_socket.c b/setup_socket.c new file mode 100644 index 0000000..3625a2a --- /dev/null +++ b/setup_socket.c @@ -0,0 +1,242 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + make it IPv6-ready + Copyright (C) 2005 Renaissance Technologies Corp. + file name is changed from main.c to setup_socket.c + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + + + Copyright (C) 2000 Aaron Hillegass + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ +/* + This part was based on + (1) codes by Aaron Hillegass + (2) codes in Steven's book 'network programming' + + 200605 change it to make it IPv6 ready + +*/ + +#include +#include +#include +#include +#include +#include /* sockaddr_in{} and other Internet defns */ +#include /* inet(3) functions */ +#include + +extern int verbose; +int delay_sec; +int delay_usec; + + +/* Set up for catcher's complaint socket */ +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port) +#else +int complaint_socket(struct sockaddr_in6 *addr, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + if (verbose>=2) fprintf(stderr, "in send_socket_ip\n"); + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + /*addr->sin_addr.s_addr = ip; this fx in for init process only + This ip will be overwritten later after + 1st packet is received. */ + #else + /* IPv6 */ + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + /* addr->sin_addr.s_addr = ip; see comments above */ + #endif + + if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Send socket"); + exit(1); + } + return fd; +} + +/* Set up mcast send socket for multicaster based on (char*)cp */ +#ifndef IPV6 +int send_socket(struct sockaddr_in *addr, char * cp, int port) +#else +int send_socket(struct sockaddr_in6 *addr, char * cp, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + char buf[50]; + if (verbose>=2) fprintf(stderr, "in send_socket\n"); + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + /*addr->sin_addr.s_addr = inet_addr(cp);*/ + inet_pton(AF_INET, cp, &addr->sin_addr); + /* Print out IP address and port */ + inet_ntop(AF_INET, &addr->sin_addr, buf, 50); + #else + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + /*addr->sin_addr.s_addr = inet_addr(cp);*/ + inet_pton(AF_INET6, cp, &addr->sin6_addr); + /* Print out IP address and port */ + inet_ntop(AF_INET6, &addr->sin6_addr, buf, 50); + #endif + + if (verbose>=2) + fprintf(stderr, "Creating a send socket to %s:%d\n", buf, port); + + if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Send socket"); + exit(1); + } + + if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){ + perror("in send_socket(): bind error (need to change MCAST_ADDR)"); + exit(1); + } + return fd; /*send_socket_ip(addr, address, port);*/ +} + +/* set up socket on the receiving end */ +#ifndef IPV6 +int rec_socket(struct sockaddr_in *addr, int port) +#else +int rec_socket(struct sockaddr_in6 *addr, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + addr->sin_addr.s_addr = htonl(INADDR_ANY); + #else + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + addr->sin6_addr = in6addr_any; /* RS book: page 92 */ + #endif + + if((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Socket create"); + exit(1); + } + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, NULL, 0); + + if (verbose>=2) + fprintf(stderr, "Creating receive socket on port %d\n", port); + + /*if ((bind(fd, addr, sizeof(*addr))) < 0){ MOD by RWM: replaced by next line */ + if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){ + perror("in rec_socket(): bind error (need to change PORT)"); + exit(1); + } + return fd; +} + +/* + set the values of two variables to be used by select() + in readable(). +*/ +void set_delay(int secs, int usecs) +{ + if (verbose>=2) { + if (usecs == -1) + fprintf(stderr, "Timeout: set to infinite\n"); + else + fprintf(stderr, "Timeout: set to %d sec + %d usec\n", secs, usecs); + } + delay_sec = secs; + delay_usec = usecs; +} + +void get_delay(int * secs, int * usecs) +{ + *secs = delay_sec; + *usecs= delay_usec; +} + +/* + Check if there is an incoming UDP for a certain amount + of time period specified in delay_tv.. + + Return 'true' if there is. + Return 'false' if no valid UDP has arrived within the time period. + + In multicaster, this is used in read_handle_page() right after + send_page() is carried out. As a result, read_handle_complaint() waits + for any incoming complaints from any target machines within + the specified time period. As it is now, no target machine + will send back complaints during the transmission of all the + pages for a file. Therefore, read_handle_complaint() serves effectively + as a time delay between sending of each page. +*/ +int readable(int fd) +{ + struct timeval delay_tv; + fd_set rset; + FD_ZERO(&rset); + FD_SET(fd, &rset); + + /* + if microsec == -1 wait forever for a packet. + This is used in the beginning when multicatcher is just + invoked. + */ + if (delay_usec == -1){ + return(select(fd + 1, &rset, NULL, NULL, NULL)); + } else { + delay_tv.tv_sec = delay_sec; + delay_tv.tv_usec = delay_usec; + return (select(fd + 1, &rset, NULL, NULL, &delay_tv)); + } +} + diff --git a/signal.c b/signal.c new file mode 100644 index 0000000..5b49b52 --- /dev/null +++ b/signal.c @@ -0,0 +1,93 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + The code in this file is copied from + Richard Stevens' book + "UNIX Network Programming" Chap.22.3 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "signal.h" + +Sigfunc * signal(int signo, Sigfunc *func) +{ + struct sigaction act, oact; + + act.sa_handler = func; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + if (signo == SIGALRM) { +#ifdef SA_INTERRUPT + act.sa_flags |= SA_INTERRUPT; /* SunOS 4.x */ +#endif + } else { +#ifdef SA_RESTART + act.sa_flags |= SA_RESTART; /* SVR4, 44BSD */ +#endif + } + if (sigaction(signo, &act, &oact) < 0) + return(SIG_ERR); + return(oact.sa_handler); +} +/* end signal */ + +Sigfunc * Signal(int signo, Sigfunc *func) /* for our signal() function */ +{ + Sigfunc *sigfunc; + + if ( (sigfunc = signal(signo, func)) == SIG_ERR) + perror("signal error"); + return(sigfunc); +} + +int Fcntl(int fd, int cmd, int arg) +{ + int n; + + if ( (n = fcntl(fd, cmd, arg)) == -1) + perror("fcntl error"); + return(n); +} + +int Ioctl(int fd, int request, void *arg) +{ + int n; + + if ( (n = ioctl(fd, request, arg)) == -1) + perror("ioctl error"); + return(n); /* streamio of I_LIST returns value */ +} + +void Sigemptyset(sigset_t *set) +{ + if (sigemptyset(set) == -1) + perror("sigemptyset error"); +} + +void Sigaddset(sigset_t *set, int signo) +{ + if (sigaddset(set, signo) == -1) + perror("sigaddset error"); +} + +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset) +{ + if (sigprocmask(how, set, oset) == -1) + perror("sigprocmask error"); +} diff --git a/signal.h b/signal.h new file mode 100644 index 0000000..bf309f8 --- /dev/null +++ b/signal.h @@ -0,0 +1,32 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include +#include +#include +#include +/******* on linux: ioctl() is defined in sys/ioctl.h instead of unistd.h as on SunOS *****/ +#include +#include + +typedef void Sigfunc(int); /* for signal handlers */ + diff --git a/timing.c b/timing.c new file mode 100644 index 0000000..d4baa15 --- /dev/null +++ b/timing.c @@ -0,0 +1,109 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include +#include + +#define N 501 /* effectively set the maximum time in rtt_hist to be 500 msec */ + +extern int verbose; + +/* for timing */ +struct timeval tv0, tv1; +unsigned long usec_acc, sec_acc; /* accumulator of timing */ +unsigned int rtt_hist[N]; /* rtt_hist[i] = count of rtt within (i, i+1) */ + +void refresh_timer() +{ + usec_acc = 0; + sec_acc = 0; +} + +void start_timer() +{ + struct timezone tz; + gettimeofday(&tv0, &tz); +} + +void end_timer() +{ + struct timezone tz; + gettimeofday(&tv1, &tz); /* end timer -------- */ +} + +void update_time_accumulator() +{ + if (tv1.tv_usec(N-2)) index = N-1; + rtt_hist[index]++; +} + +void pr_rtt_hist() +{ + int i; + fprintf(stderr, "rtt histogram\n"); + fprintf(stderr, "msec counts\n"); + fprintf(stderr, "---- --------\n"); + for(i=0; i10) continue; + if (rtt_hist[i] != 0) { + fprintf(stderr, "%4d %u\n", i, rtt_hist[i]); + } + } +} + +unsigned int pages_wo_ack() +{ + return rtt_hist[N-1]; +} diff --git a/trFilelist.c b/trFilelist.c new file mode 100644 index 0000000..8f65796 --- /dev/null +++ b/trFilelist.c @@ -0,0 +1,449 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +/***this code parse the synclist file generated by rsync in dry-run mode. + The minimum options for rsync is : -avW --dry-run --delete + e.g. + /usr/local/bin/rsync --rsync-path=/usr/local/bin/rsync + -avW --dry-run --delete + /src/path/ dest_machine:/target/path/ > output 2>&1 +A typical output of rsync may look like this: +Client is very old version of rsync, upgrade recommended. +building file list ... done +xyz -> ./sub1/xyz +file1 +fn +fn1 +j +sub1/hardlink_to_file1 +sub1/path/testfile +sub1/xyz +sent 337 bytes read 44 bytes 254.00 bytes/sec +total size is 320751046 speedup is 841866.26 + +This code does the following three things. +(1) skip the lines before 'done' and after 'wrote' +(2) output all directories and file_path + e.g + for an entry: sub1/path/testfile, the output is + sub1 + sub1/path + sub1/path/testfile +(3) xyz -> ./sub1/xyz + the output is + xyz +(4) If file1 and sub1/hardlink are hardlinked + the output is + file1 + sub1/hardlink file1 + +For the example output above, the output of this code is: + +xyz +file1 +fn +fn1 +j +sub1 +sub1/hardlink_to_file1 file1 +sub1/path +sub1/path/testfile +sub1/xyz + +***/ + +#include +#include +#include +#include +#include /* to define PATH_MAX */ +#include + +#define TRUE 1 +#define FALSE 0 + +struct string_list { + int capacity; + char ** endp; + char ** str; +}; + +void init_string_list(struct string_list * str_ptr, int n) +{ + str_ptr->str = malloc(n * sizeof(void*)); + str_ptr->capacity = n; + str_ptr->endp = str_ptr->str; +} + +void grow_string_list(struct string_list * slp) +{ + int new_capacity = 2 * slp->capacity; + char ** old_ptrs = slp->str; + char ** new_ptrs; + char ** newp = malloc(new_capacity * sizeof(void *)); + new_ptrs = newp; + + while (old_ptrs < slp->endp) { + *new_ptrs++ = *old_ptrs++; + } + + free(slp->str); + slp->str = newp; + slp->endp= new_ptrs; + slp->capacity = new_capacity; +} + +void append_string_list(char * str, struct string_list * slp) +{ + if (slp->endp - slp->str == slp->capacity) grow_string_list(slp); + *slp->endp = strdup(str); + (slp->endp)++; +} + +/*************** change to return index ****/ +int find_string(char * str, struct string_list * slp) +{ + /* find if str is in the list */ + int i; + int n = slp->endp - slp->str; + + for(i=0; istr)[i], str)==0) return i; + } + return -1; +} + +/* find if a string in string-list is a sub-string of str */ +int has_sub_string(char * str, struct string_list *slp) +{ + int i; + int n = slp->endp - slp->str; + + for(i=0; istr)[i], strlen((slp->str)[i]))==0) return i; + } + return -1; +} + +/* find if the str is a substr of those in slp */ +int has_newdir(char *str, struct string_list *slp) +{ + int i; + int n = slp->endp - slp->str; + + for(i=0; istr)[i],str, strlen(str))==0) return i; + } + return -1; +} + +struct uint_list { + int capacity; + unsigned int * endp; + unsigned int * d; +}; + +void init_uint_list(struct uint_list * uil_ptr, int n) +{ + uil_ptr->d = malloc(n * sizeof(unsigned int)); + uil_ptr->capacity = n; + uil_ptr->endp = uil_ptr->d; +} + +void grow_uint_list(struct uint_list * uilp) +{ + int new_capacity = 2 * uilp->capacity; + unsigned int * old_ptrs = uilp->d; + unsigned int * new_ptrs; + unsigned int * newp = malloc(new_capacity * sizeof(unsigned int)); + new_ptrs = newp; + + while (old_ptrs < uilp->endp) { + *new_ptrs++ = *old_ptrs++; + } + + free(uilp->d); + uilp->d = newp; + uilp->endp= new_ptrs; + uilp->capacity = new_capacity; +} + +void append_uint_list(unsigned int data, struct uint_list * uilp) +{ + if (uilp->endp - uilp->d == uilp->capacity) grow_uint_list(uilp); + *uilp->endp = data; + (uilp->endp)++; +} +/*************** change to return index ****/ +int find_unit(unsigned int data, struct uint_list * uilp) +{ + /* find if data is in the list */ + int i; + int n = uilp->endp - uilp->d; + + for(i=0; id)[i] == data) return i; + } + return -1; +} + +struct string_list file_list; +struct uint_list ino_list; +struct string_list dir_list; +struct string_list softlink_list; /* for (a) */ +struct string_list newdir_list; /* for (b) */ + +void strip(char * str) +{ + /* remove trailing \n and spaces */ + char *pt; + char *pc = &str[strlen(str)-1]; + while (*pc == ' ' || *pc == '\n') *(pc--) = '\0'; + /* 20080317 remove leading spaces */ + pt = pc = &str[0]; + while (*pc == ' ') ++pc; + if (pc != pt) { + while (*pc != '\0') *pt++ = *pc++; + *pt = '\0'; + } +} + +void output_subs(char * str) +{ + return; /*************************** testing ***************/ + /* to do (2) indicated in the above */ + /******** + char * pc; + char subs[PATH_MAX]; + pc = strstr(str, "/"); + if (!pc) return; + + while (pc) { + strncpy(subs, str, pc-str); + subs[pc-str] = '\0'; + if (find_string(subs, &dir_list)<0) { + printf("%s\n", subs); + append_string_list(subs, &dir_list); + } + pc = strstr(pc+1, "/"); + } + ************/ +} + +/*** (a) + get those softlinks that points to a directory + this is to deal with the following scenario + previous structure + dir_path (a directory) + db (a directory) + + newly updated structure on master + dir_path -> db + db + + rsync --dry-run generates + dir_path -> db [a link is done on target] + deleting dir_path/sub/filename1 [wrong file gets removed ] + deleting dir_path/sub/filename2... + + file_operations.c does this when dir_path -> db is due + delete dir_path (rm -rf) + make the softlink + But then the following delete will have undesired deletion. + + ------------------------------------------------------------ + + (b) + t0 name -> xyz name -> xyz (target) + t1 name/ name -> xyz + + rsync generates + name/ update_directory() won't have effect + name/f1 delivered to wrong place + name/f2 + deleting name too late + ** the deletion should be done before not after. + For now, I will fail this code for this situation. + +***/ +void get_dir_softlinks(char *filename, char * basedir) { + FILE * fd; + char line[PATH_MAX]; + struct stat st; + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + exit(-1); + } + + while (1) { /* for each line in the file */ + char *pc; + char fn[PATH_MAX]; + + if (fgets(line, PATH_MAX, fd)==NULL) break; + strip(line); + if (strlen(line) == 0) continue; /* skip blank line */ + + /* the softlink case is indicated by -> */ + pc= strstr(line, " -> "); + if (pc) { /* it is a softlink */ + *pc = '\0'; + /* check if it is a directory */ + sprintf(fn, "%s/%s", basedir, line); + + /* check if the link-target is a directory */ + if (stat(fn, &st)<0) continue; /* We skip this bad entry - no longer exist */ + + if (S_ISDIR(st.st_mode)) { + append_string_list(line, &softlink_list); + } + } else { /* not a softlink --> find if it is a directory */ + /* find a line without ' ' and with trailing '/' */ + pc = strstr(line, " "); /* the first space */ + if (!pc) { + char * plast = &line[0] + strlen(line) - 1; + if (*plast == '/') { + append_string_list(line, &newdir_list); + } + } + } + } + + fclose(fd); +} + + +int main(int argc, char * argv[]) +{ + char * filename; + char * basedir; + FILE *fd; + char line[PATH_MAX]; + + if (argc < 3) { + fprintf(stderr, "Usage: trFilelist synclist_filename basedir\n"); + exit(-1); + } + + filename = argv[1]; + basedir = argv[2]; + + init_string_list(&file_list, 10); + init_uint_list(&ino_list, 10); + init_string_list(&dir_list, 100); + init_string_list(&softlink_list, 10); + init_string_list(&newdir_list, 100); + + get_dir_softlinks(filename, basedir); + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + return -1; + } + + while (1) { /* for each line in the file */ + char *pc; + char fn[PATH_MAX]; + struct stat st; + int newdir_flag; + + if (fgets(line, PATH_MAX, fd)==NULL) break; + strip(line); + if (strlen(line) == 0) continue; /* skip blank line */ + if (strcmp(line, ".")==0) continue; + if (strcmp(line, "./")==0) continue; + + /* first we look for deleting entry */ + if (strncmp(line, "deleting ", 9)==0) { + /* deleting (directory) file_path */ + char * p1, *p2, *pf; + + p1 = strstr(line, " "); /* the first space */ + p2 = strstr(p1+1, " "); /* deleting directory filepath * 20070912 this is old */ + pf = (p2) ? p2+1 : p1+1;/* it's always p1+1 */ + + newdir_flag = has_newdir(pf, &newdir_list); + + if ((has_sub_string(pf, &softlink_list)<0) && newdir_flag<0) { + /* see comments above get_dir_softlinks() */ + printf("deleting %s\n", pf); + } else if (newdir_flag>=0) { /* temporary action */ + /*** we can simply skip this block later. 20070912 ***/ + /***/ + fprintf(stderr, "CRITICAL ERROR: An old softlink has been changed to a directory!\n"); + fprintf(stderr, " For now, we crash this code for human intervention\n"); + fprintf(stderr, " line= %s\n", line); + exit(-1); + /***/ + } + + continue; + } + + /* the softlink case is indicated by -> */ + pc= strstr(line, " -> "); + if (pc) { + *pc = '\0'; + output_subs(line); + printf("%s\n", line); + continue; + } + + /* if rsync's -H is turned on, the output may contain + file => tar_hardlink_file (relative address) + */ + pc= strstr(line, " => "); + if (pc) { + *pc = '\0'; + output_subs(line); + printf("%s %s\n", line, pc+4); + continue; + } + + /* the rest of the entries should be valid paths */ + sprintf(fn, "%s/%s", basedir, line); + if (lstat(fn, &st)<0) continue; /* We skip this bad entry - + (1) the header and tail lines + (2) perhaps the file no longer exists */ + + /* is this a hardlink? */ + if (st.st_nlink > 1) { + int index; + output_subs(line); + if ((index = find_unit((unsigned int)st.st_ino, &ino_list))<0) { + append_uint_list((unsigned int)st.st_ino, &ino_list); + append_string_list(line, &file_list); /* relative path */ + printf("%s\n", line); + } else { + printf("%s %s\n", line, file_list.str[index]); + } + continue; + } + + /* all others */ + output_subs(line); + printf("%s\n", line); + } /* end of one line */ + + fclose(fd); + return 0; +} -- cgit v1.2.3-70-g09d2