diff options
| -rw-r--r-- | GPL.License | 280 | ||||
| -rw-r--r-- | Makefile | 70 | ||||
| -rw-r--r-- | Makefile.Sun | 71 | ||||
| -rw-r--r-- | README | 302 | ||||
| -rw-r--r-- | README.more | 353 | ||||
| -rw-r--r-- | backup.c | 108 | ||||
| -rwxr-xr-x | cmdToTarget.py | 43 | ||||
| -rw-r--r-- | complaint_sender.c | 158 | ||||
| -rw-r--r-- | complaints.c | 579 | ||||
| -rw-r--r-- | file_operations.c | 800 | ||||
| -rw-r--r-- | global.c | 35 | ||||
| -rw-r--r-- | id_map.c | 74 | ||||
| -rw-r--r-- | main.h | 189 | ||||
| -rwxr-xr-x | mrsync.py | 284 | ||||
| -rw-r--r-- | mrsync_config.py | 67 | ||||
| -rw-r--r-- | multicaster.c | 582 | ||||
| -rw-r--r-- | multicatcher.c | 181 | ||||
| -rw-r--r-- | page_reader.c | 426 | ||||
| -rw-r--r-- | parse_synclist.c | 320 | ||||
| -rw-r--r-- | proto.h | 182 | ||||
| -rw-r--r-- | rtt.c | 258 | ||||
| -rw-r--r-- | rttcatcher.c | 118 | ||||
| -rw-r--r-- | rttcomplaint_sender.c | 103 | ||||
| -rw-r--r-- | rttcomplaints.c | 270 | ||||
| -rw-r--r-- | rttmain.h | 126 | ||||
| -rw-r--r-- | rttmissings.c | 93 | ||||
| -rw-r--r-- | rttpage_reader.c | 188 | ||||
| -rw-r--r-- | rttproto.h | 116 | ||||
| -rw-r--r-- | rttsends.c | 144 | ||||
| -rw-r--r-- | sends.c | 329 | ||||
| -rw-r--r-- | set_catcher_mcast.c | 142 | ||||
| -rw-r--r-- | set_mcast.c | 160 | ||||
| -rw-r--r-- | setup_socket.c | 242 | ||||
| -rw-r--r-- | signal.c | 93 | ||||
| -rw-r--r-- | signal.h | 32 | ||||
| -rw-r--r-- | timing.c | 109 | ||||
| -rw-r--r-- | trFilelist.c | 449 |
37 files changed, 8076 insertions, 0 deletions
diff --git a/GPL.License b/GPL.License new file mode 100644 index 0000000..960fe74 --- /dev/null +++ b/GPL.License @@ -0,0 +1,280 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc. + 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Library General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9de65aa --- /dev/null +++ b/Makefile @@ -0,0 +1,70 @@ +# ----- start of system dependent section ----- + +INSTALL = cp -p + +SUNFLAG = # -D_SUN on Solaris machines +IPV6FLAG = # -DIPV6 for IPv6 +DEBUG = # -g -ggdb +CFLAGS = -O ${DEBUG} -Wall ${SUNFLAG} ${IPV6FLAG} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 +#LIBS = -lsocket # for Solaris +LIBS = # there is no special lib needed, unless your system put the lib in non-standard place + +# The directory to install mrsync and others in. +bindir = /usr/local/bin + +# ----- end of system dependent section ------- + +CLEANFILES = *.o *~ + +PROGS = multicaster multicatcher rtt rttcatcher trFilelist +SCR = mrsync.py mrsync_config.py cmdToTarget.py +OBJ1 = multicaster.o multicatcher.o \ + parse_synclist.o sends.o complaints.o \ + complaint_sender.o page_reader.o file_operations.o backup.o \ + set_catcher_mcast.o set_mcast.o + +OBJ4 = rtt.o rttsends.o rttcomplaints.o \ + rttcatcher.o rttpage_reader.o rttcomplaint_sender.o rttmissings.o + +all: ${PROGS} + +install: ${PROGS} + ${INSTALL} ${PROGS} ${SCR} ${bindir} + +# common files +signal.o: signal.h + +# multicasting +${OBJ1}: main.h proto.h + +multicaster: multicaster.o global.o setup_socket.o set_mcast.o \ + parse_synclist.o \ + sends.o complaints.o backup.o \ + timing.o signal.o id_map.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +multicatcher: multicatcher.o global.o setup_socket.o set_catcher_mcast.o \ + page_reader.o complaint_sender.o \ + file_operations.o signal.o timing.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# for rtt and rttcatcher +${OBJ4}: rttmain.h rttproto.h + +rtt: rtt.o setup_socket.o set_mcast.o \ + rttsends.o rttcomplaints.o timing.o signal.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +rttcatcher: rttcatcher.o setup_socket.o set_catcher_mcast.o \ + rttpage_reader.o rttcomplaint_sender.o rttmissings.o \ + signal.o timing.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# misc +trFilelist: trFilelist.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# to clean up +clean: + rm -f ${PROGS} ${CLEANFILES} + diff --git a/Makefile.Sun b/Makefile.Sun new file mode 100644 index 0000000..8fe57da --- /dev/null +++ b/Makefile.Sun @@ -0,0 +1,71 @@ +# ----- start of system dependent section ----- + +INSTALL = cp -p + +SUNFLAG = -D_SUN # -D_SUN on Solaris machines +IPV6FLAG = # -DIPV6 for IPv6 +DEBUG = # -g -ggdb +CC = gcc-2.95.3.ren # 32-bit compiler +CFLAGS = -O ${DEBUG} -Wall ${SUNFLAG} ${IPV6FLAG} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 +LIBS = -lsocket # for Solaris +#LIBS = /usr/local/mtools/nova/lib/lea.Linux.o # for monster clusters (32-bit lib) + +# The directory to install mrsync and others in. +bindir = /usr/local/bin + +# ----- end of system dependent section ------- + +CLEANFILES = *.o *~ + +PROGS = multicaster multicatcher rtt rttcatcher trFilelist +SCR = mrsync.py mrsync_config.py cmdToTarget.py +OBJ1 = multicaster.o multicatcher.o \ + parse_synclist.o sends.o complaints.o \ + complaint_sender.o page_reader.o file_operations.o backup.o \ + set_catcher_mcast.o set_mcast.o + +OBJ4 = rtt.o rttsends.o rttcomplaints.o \ + rttcatcher.o rttpage_reader.o rttcomplaint_sender.o rttmissings.o + +all: ${PROGS} + +install: ${PROGS} + ${INSTALL} ${PROGS} ${SCR} ${bindir} + +# common files +signal.o: signal.h + +# multicasting +${OBJ1}: main.h proto.h + +multicaster: multicaster.o global.o setup_socket.o set_mcast.o \ + parse_synclist.o \ + sends.o complaints.o backup.o \ + timing.o signal.o id_map.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +multicatcher: multicatcher.o global.o setup_socket.o set_catcher_mcast.o \ + page_reader.o complaint_sender.o \ + file_operations.o signal.o timing.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# for rtt and rttcatcher +${OBJ4}: rttmain.h rttproto.h + +rtt: rtt.o setup_socket.o set_mcast.o \ + rttsends.o rttcomplaints.o timing.o signal.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +rttcatcher: rttcatcher.o setup_socket.o set_catcher_mcast.o \ + rttpage_reader.o rttcomplaint_sender.o rttmissings.o \ + signal.o timing.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# misc +trFilelist: trFilelist.o + ${CC} ${CFLAGS} -o $@ $^ ${LIBS} + +# to clean up +clean: + rm -f ${PROGS} ${CLEANFILES} + @@ -0,0 +1,302 @@ + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +20060416 version 3.0 + This is a major update from previous version 2.1. New features include: + -- large file support + multicaster can now transfer file whose size is larger than 2**31-1. + -- platform independence (between linux, unix) + We have tested this independence from a Sun to a Linux machine. + -- catching slow machine as the feedback monitor dynamically + This significantly reduces the number of occurence of SIT-OUT-RECEIVING-FILE cases. + -- removing meta-file-info + Since our basic algorithm is different from that of Aaron's, the meta-file-info + is redundant for in our case. + -- backup feature (as in rsync) + If the target machines are NFS data servers, backup feature is absolutely + necessary. The feature is implemented in the low level multica* codes, + I will incorporate it into mrsync.py in the next month or so. + -- mcast IP address and port options + They are now tested and are working. + This allows multiple multicasting sessions running simultaneously. + -- code cleanup in both mrsync.py and *.c + -- code fixing for 64bit arch; + tested on Debian 64 bit arch by Nicolas Marot in France + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT case + -- code provision for IPv6 (but not tested yet) +version 3.1.1 + -- minor bug fixes. + -- add checking in multicatcher to check if the + system is ready for write. + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. +version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() +version 4.0.0 + -- previously, missing page report is sent back one page + at a time. I change it to ~60K pages at a time. This + drastically reduces the network traffic. + Performance (syncing time) is in general improved. + -- clean up scheme in choosing a monitor machine. + -- misc code cleaning up and minor adjustments. + +version 4.0.1 + -- Dave Close at ThalesGroup in CA points out + the type of c in "c = getopt()" should have been "int c" + [ It had been "char c". ] + +The mrsync package includes the following files: +-------------------- +README -- this file (usage example in the end.) +README.more -- another readme. +GPL.License -- legal stuff +mrsync.py -- the driver script that glues everything together +mrsync_config.py -- system dependent configuration files +cmdToTarget.py -- module to send command to remote machines +Makefile.Sun +Makefile.linux +backup.c +complaint_sender.c +complaints.c +file_operations.c +global.c +id_map.c +multicaster.c +multicatcher.c +page_reader.c +parse_synclist.c +rtt.c +rttcatcher.c +rttcomplaint_sender.c +rttcomplaints.c +rttmissings.c +rttpage_reader.c +rttsends.c +sends.c +set_catcher_mcast.c +set_mcast.c +setup_socket.c +signal.c +timing.c +trFilelist.c +main.h +rttproto.h +proto.h +signal.h +rttmain.h + +------------ +To install, +------------ +(1) copy these files to a directory on the target and + master machines. + (All target machines need at least a copy of the + multicatcher.) + Or the best (efficient) solution is to install the package + on a common file server to which all machines can access. +(2) Depending on which platform you are in, + mv Makefile.Sun (or MakeFile.linux) into Makefile. +(3) edit the system dependent section in Makefile. + Especially, the bindir variable. + It is used in the 'install' to copy the executable + binaries into the destination directory. +(4) make + to compile and generate the executables. (the warnings for format are harmless). +(5) make install + to copy the executables into $bindir. +(6) edit mrsync_config.py + change the paths for your system. + mrsync_config.py is read by mrsync.py +(7) check if your system has rsync installed + in the directory specified in rsyncPath and remote_rsyncPath + in the file mrsync_config.py +(8) check if your system has python installed. + Python, like perl, is included in most linux distribution. + As distributed, mrsync.py invokes python through the 1st line + !/usr/bin/env python + If this does not work for you system, you replace + the 1st line of mrsync.py with correct one. + +------------------------------ +Usage of mrsync.py +------------------------------ + +(1) the function of ping -- + This is removed from multicaster. + Instead, please use rtt which gives more useful info. + See 'Usage of rtt' below. + +(2) mrsync.py -m /dirX/target.list + -s /dirY/data + -t /dirZ/data + + This is a routine job carried out at Renaissance Technologies Corp. + The file target.list lists the names of all target machines (one name per line). + + Before carrying out multicasting, mrsync.py invokes rsync, + to find those files in the source directory (-s), + compared with the target directory (-t) in the first target machine, + that need to be transfered to all target machines. + If -t is not specified, then mrysnc assumes that the dest_path + on target machines is same as that in -s. + + mrsync.py uses rsync with the following minimum options: + --rsh=rsh -avW --dry-run --delete + This should work in most of routine cases. + This minumum option is specified in mrsync_config.py. + I did not test if other combination of options will work + so I would advise that you do not change the minimum option. + + However, you can add additional options on top of that minimum + options. Use '-o' in mrsync.py + e.g. + mrsync2.py -m list -s /path/sync/test -t /dest/sync/test -v 1 -o '-H' + This turns on --hard-link option in rsync. But hardlink is very tricky + to deal with. The minumum options supplied in mrsync_config.py is the one + that we find works pretty good in terms of dealing with hardlinks. + I do find that some rare sequence of syncing sessions requires that I repeat + another round of mrsync.py with additional + -o '-H' to set the hardlinks on the targets. + + The output of rsync is stored in a tmp file, which in turn gets + parsed by trFilelist. trFilelist translates the list of files into + a specific format for multicaster. + The reason for this two step process is to remove direct dependence of + multicaster on rsync. + The gain is that we can now do the case as shown in (3). + +(3) mrsync.py -m machine1,machine2 + -s /dirB/data/ + -l *.c,subs/foo*,sub2/path/[0-9][0-9].ext + + Here have two new things as compared with the case in (2). + First, the two target machines are listed directly + in -m option in the form of csv list + (instead of listing them in a machine_list file). + Second, the -l option is specified. + -l option lists the files that we want to be synced. + i.e. with this option, we bypass the rsync. + In this example, we want to sync + all /dirB/data/*.c files, + all /dirB/data/subs/foo* files, + and all /dirB/data/sub2/path/[0-9][0-9].ext + The pattern specified should be as those recognizable by 'ls'. + + Obviously, this usage is convenient for syncing small number of + specific files to a couple of machines. + +(4) mrsync.py -m /dirA/target_list + -s /dirB/data + -A 239.255.70.88 + -P 8010 + + Same functionality as in (2). + But here we specify the multicast IP address and port number + for use by both multicatcher and multicaster. + + This allows multiple mrsync sessions to run simultaneously + as long as each session uses its own mcast_address and port_number. + [ No overlap between sessions. ] + +(5) mrsync.py -m /dirA/target_list + -s /path + -X + Same functionality as in (2). + But (2) is a normal sync. This one invokes the -X. + In a normal sync, a given file (let's say /path/foo) is synced + with two steps, + (a) sync the file to /path/foo.tmp + (b) mv /path/foo.tmp /path/foo + With -X option, the same file is synced with three steps, + (a) rm /path/foo + (b) sync the file to /path/foo.tmp + (c) mv /path/foo.tmp /path/foo + This is useful when the target disk is almost full, and + could not afford to have foo.tmp and foo co-existing. + +(6) By default, mrsync.py prints out only very essential messages. + If you want to see more messages (related to some detailed status including + a rtt_histogram at the end of the session), + you can spcify '-v 1' in the mrsync.py command line argument list. + '-v 2' will print out even more detailed messages --- for debugging purpose! + +(7) other options of mrsync.py + type 'mrsync.py' and see them for yourself. + +-------------------------------------------------------------------------------- +Usage of rtt +---------------- +rtt is a utility for testing between master and one machine. +Its main function is to report the (histogram) distribution of the +round-trip-time interval for pages sent. +I use it to ping a machine +and to measure the typical rtt time and to see if the network is +connected properly. System administrators can use the rtt time info +to turn up the network. + +Example usage: + +(1) on master, I invoke rtt and it gives the report as follows. + +master:~ 1>rtt -r ssh -m target -n 1000 -s 64000 + +using ssh to invoke rttcatcher on target +Sun Apr 16 17:36:58 EDT 2006 +ssh target '/path/rttcatcher -a 239.255.67.200 -p 7900 1>/dev/null 2>/dev/null & echo $!' +remote pid = 27283 +Sending data... +Missing pages = 0 ( 0.00%) out of total pages = 1000 +rtt histogram +msec counts +---- -------- + 1 989 + 2 3 + 3 7 + 4 1 + + +The option -m specifies the name of the target machine. +-n specifies the number of pages that we want to send. +-s specifies the size of each page. (maximum = 64512) + +In this example, the total number of pages sent is 1000 +which is the sum of the counts in the rtt historgram table. +The histogram shows that the majority of rtt is around 1-2 msec. +If the network is really busy, the distribution of the histogram +will be broadened (instead of concentrated around one particular value). + +(2) more options for rtt. + type 'rtt' and see them on the screen. + +-------------------------------------------------------------------------------- + +HP Wei +hp@rentec.com +Renaissance Technologies Corp. + + diff --git a/README.more b/README.more new file mode 100644 index 0000000..3a8f7e1 --- /dev/null +++ b/README.more @@ -0,0 +1,353 @@ + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ++------------+ +| CHANGE LOG | ++------------+ +Jul - Oct 2008 -- version 4.0 + -- previously, missing page report is sent back one page + at a time. I change it to ~60K pages at a time. This + drastically reduces the network traffic. + -- clean up scheme in choosing a monitor machine. +Feb - May 2006 -- version 3.0 + verision 3.0.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + We transmit the file_stat info right before + the file is being synced. And the file-stat info + is transfered by ascii format to avoid byte-ordering + translation. + The orignal meta_data is not necessay under this scheme. + -- catching slow machine as the feedback monitor + [ the signal_handler scheme in version 2.0 is + removed. ] + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64-bit arch by Nicolas Marot in France + Nicolas also translated mrsync.py to mrsync.sh + (nicolas.marot@gmail.com) + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + +June 9, 2005 -- version 2.1 + Clint Byrum at clint@careercast.com asks about the exit status + of multicaster, which is changed so that it exits whenever + there are bad_machines or files_sat_out. +May 2005 -- Second version + The major changes are: + (1) implementing congestion control so that mrsync is more + net-traffic-friendly. + (2) using python script as the glue to put everything together. + i.e. the previous mrsync.c is replaced by mrsync.py + Other changes collected over the years through interactions with + users include: + (1) adding options to change the default IP address for multicast, + and the default port number for flow control. + This was suggested by Robert Dack <robd@kelman.com> + (2) replacing memory mapped file IO with the usual + seek() and write() sequence. + This change was echoed by Clint Byrum <clint@careercast.com> + (3) adding verbose control so that by default mrsync prints + only essential info instead of detailed status report. + This was suggested by Clint Byrum <clint@careercast.com> + +Jan 2002 -- First version uploaded to http://freshmeat.net/projects/mrsync/ + The tar file is in ftp://felder.org/pub/mrsync.tar + ++-----------------+ +| MRSYNC vs RSYNC | ++-----------------+ +mrsync is a package that consists of utilities for transfering +a bunch of files from a master machine to multiple target +machines simultaneously +by using the multicasting capability in the UNIX system. +The name 'mrsync' is inspired by the +popular utility 'rsync' for synchronizing files between +two machines. However, beyond this similarity in the +functionality, mrsync is fundamentally different from rsync +in three areas. +(1) rsync uses TCP while mrsync needs UDP in order + to use the multicasting part of UNIX's socket communication. + The former limits the data commuinication to one-to-one-machine + whereas the latter allows one-to-many. + UDP has, however, no built in flow control. As a result, + the major part of mrsync + (more precisely, the multicaster and multicatcher), + is devoted to synchronizing the data flow. +(2) For a given file, + rsync transfers optionally only those parts in the file + that are different + in the two versions on the master and the target machine. + This saves time, esp on a slow network. + mrsync, in contrast, transfers the whole content of a file + to all targets at one time. The time gain of mrsync comes from + its concurrency as described in (3). +(3) Replicated data servers have become poplular to serve + thousands of clients. Using rsync to replicate the data + to hundreds of disks is very time consuming. The total + time it takes is proportional to the number of target + disks. In contrast, mrsync scales much better because + it puts the contents of the file on the network only one + time. e.g. we have used mrsync to transfer 140GB of data + to 100 machines in a 1Gbit LAN in about 4-6 hours. + +This project has started in 2001. We have been using the tool +every day. +Recently, I have upgraded this tool to handle large-files, +to make it platform independent, and to make it 64bit ready. +The current version is 4.0.0 which is stable. +It has been used by many people in the UNIX community. +Lately, people in France has helped me test the code on 64bit arch. +They are proposing to make it a standard package in the Debian distribution. + +mrsync was originally posted on Freshmeat.net. But the link there +points to one of our company's disks. We would like to find a place +on SourceForge.net for this package. + + + ++-------------------+ +| HISTORY OF MRSYNC | ++ ------------------+ +The project of mrsync stemmed from the necessity to transfer +many files to hundreds of machines running Linux at Renaissance +Technologies Corp. Looking into the Open Source Community, we found +a preliminary utility codes of multicasting written by Aaron Hillegass. +Many unsuccessful test-runs on a huge amount of data files, however, +led us to embark on an overhaul of the code. +Most of the following items were inherited and bug-fixed from +the original codes. +* The low level functions that + interact with UNIX's multicasting sockets. +* Meta_data -- the essential info about a file which the master + machine will first transmit to the target machines. + [ removed in 2006 (see above change log)]. +* Division of a file into many 'pages'. +* The idea of maintaining a missing page flag. +* The idea of a multicaster and multicatcher loop. + +In this mrsync, we develop two new critical elements: +flow-control message communication conducted by the multicaster, +and a four-state page reader (processor) in the multicatcher. +The former is to synchronize the task each machine is performing. +For example, the master will not start sending +the pages of a file unless all machines have acknowledged +the completion of openning the disk i/o for the file. +In order to accomodate these elements, the codes have been +changed significantly from the original version. +For example, the multicatcher now never asks for slowing down. +And multicaster sends data on a file-by-file basis. +The file integrity is achieved by orchestrating the +data flow which is closely monitored and conducted +by the master machine. + +[200505] we add congestion control into the code. +After the master sends one page for a file, it will not send +the next page until it receives the acknowledgement (ack) message +sent by a monitor target (a feedbacker if you will). +This simply-minded congestion control prevents sending pages +at a pace with which a busy network cannot digest. +The feedbacker is chosen at the initialization stage of multicaster. +It may be changed by multicaster whenever it fails to send back +ack message within a certain duration. + +[2006] The monitor machine will send back ack message +only after it has written the page to disk. This way the 'rtt' +calculated by the master machine includes the disk IO time, +instead of just the network-traffic time. The master also +selects the slowest target machine as the monitor machine. +These two ingredients provide more precise picture of +how busy the whole system is, and thus tend to leave no target +machine behind. + +As of today, mrsync has been in full use at Renaissance +on a regular basis. + ++-------------------+ +| TEST RUNNING TIME | ++-------------------+ +[200810] using version 4.0.0. The performance improves (if the target machines + are not busy). + Number of targets = 32. All linux machines on 1Gbit network. + +Total number of files = 283 Pages w/o ack = 0 ( 0.00%) +Total number of pages = 100129 Pages re-sent = 6852 ( 6.84%) +Total number of bytes = 6449407171 Bytes re-sent = 442030792 ( 6.85%) +Total time spent = 2.55 (min) ~ 0.37 (min/GB) + +Send pages time = 1.78 (min) ~ 0.26 (min/GB) + +rtt histogram +msec counts +---- -------- + 0 103415 + 1 1985 + 2 162 + 3 73 + 4 145 + 5 68 + 6 253 + 7 203 + 8 305 + 9 184 + 10 156 + +[200604] using version 3.x.x, here is an example of the final statistics + in one of our routine syncing jobs. + Number of targets = 32. All linux machines on 1Gbit network. + +total number of files = 4076 +Total number of pages = 480719 Pages re-sent = 39994 ( 8.32%) +Total number of bytes = 30875638130 Bytes re-sent = 2573924662 ( 8.34%) +Total time spent = 39.50 (min) ~ 1.18 (min/GB) + +rtt histogram +msec counts +---- -------- + 0 371023 + 1 118874 + 2 21977 + 3 3779 + 4 2226 + 5 760 + 6 584 + 7 580 + 8 288 + 9 162 + 10 92 + ... + +[200201] +25 minutes for a group of files whose total size amounts to 4.6GB. +(This data is obtained from running on 5 SUN machines + with Solaris 8 on an Ethernet LAN whose bandwidth is 100Mbits/sec.) + ++-------------------------------------+ +| MAIN ALGORITHM FOR SENDING ONE FILE | ++-------------------------------------+ +In the multicaster code running on the master machine: +In the initialization stage, the code selects one target machine +as a monitor machine (feedbacker), which sends back acknowledgement +message when one page is received. +The main loop for multicaster is as follows. +(1) Send OPEN_FILE command (message). +(2) Wait for all machine to send back acknowledgement (ack). +(3) If any machine does not ack back within a certain time period, + that machine is considered bad and is dropped out + of the monitoring list. + During that waiting time period, the master sends the + OPEN_FILE message to those potentially bad machines regularly. +(4) Start sending pages, + (all pages, in the first time round; + those missing pages reported back by target machines, in other rounds). + During this period, all target machines except the feedbacker, if activated, + just receive and process whatever pages that arrive at the receive buffer. + + There are two modes of operation for the master to send these pages. + (a) When the congestion control is turned off (-x option in mrsync.py) + the master sends out one page and wait for a duration (interpage interval), + which is specified as DT_PERPAGE in main.h, before it sends the next one. + (b) If the congestion control is turned on (the default behavior of mrsync.py), + the master will not send the next page until it receives the ack. + On the feedbacker, once a SIGIO is triggered indicating the arrival + of a page, it sends back ack message to the master. +(5) Send EOF message to signal the end of transmission + for this file. +(6) Wait for all machines to send back ack. + They either report ok + or report a list of page numbers that are missing. +(7) If any machine does not ack back within a certain time period, + that machine is considered bad and is dropped out + of the monitoring list. + During that waiting time period, the master sends the + EOF message to those potentially bad machines regularly. +(8) If there are missing pages, go back to (4). +(9) Send CLOSE_FILE message. +(10) Wait for all machine to send back ack. +(11) If any machine does not ack back within a certain time period, + that machine is considered bad and is dropped out + of the monitoring list. + During that waiting time period, the master sends the + CLOSE_FILE message to those potentially bad machines regularly. +(12) If there are more files to transfer, go back to (1). + + +In multicatcher running on the target machine: +the main loop performs the following steps, +(1) Start with IDLE_STATE. +(2) Upon receiving OPEN_FILE and being in IDLE_STATE, + set up a momory mapped temporary file whose size + equals that specified in the meta_data, + which has been received in prior time. + if things are not successful, + don't send back ack. + else + send back ack and enter GET_DATA_STATE. +(3) Upon receiving one UDP and being in GET_DATA_STATE, + store it into the right place in the temporary file. + Expect to receive more UDP data and go back to (3). +(4) Upon receiving EOF and being in GET_DATA_STATE, + if there are some missing pages, + report them and expect to go back to (3). + else there are no missing page, + send back ack and enter DATA_READY_STATE. + if there is sick conditions, + send back ack and enter SICK_STATE. +(5) Upon receiving CLOSE_FILE and being in DATA_READY_STATE, + if in DATA_READY_STATE, + rename(temporary_file, the_real_filename), + clean up the memory mapped area. + if things go well, + send back the ack, enter IDLE_STATE + and expect to go back to (1), + else don't send back ack. + else if in SICK_STATE, + send back sit_out ack, enter IDLE_STATE + and expect to go back to (1). + +In addition to the above main loop that runs on all target machines, +the selected monitor machine (feedbacker) needs to send back +an received_ack message for every page received and processed. + +-------------------------------------------------------------- +Note: The original version dealt with three types of 'files': + directory, softlink and regular file. + mrsync includes one more: hardlink file. + +HP Wei +hp@rentec.com +Renaissance Technologies Corp. +Nov 15, 2001 (first version) +May 20, 2005 (second version) +Apr 16, 2006 (third version) +Oct 28, 2008 (fourth version) + + + diff --git a/backup.c b/backup.c new file mode 100644 index 0000000..3c59a9d --- /dev/null +++ b/backup.c @@ -0,0 +1,108 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include <regex.h> /* use POSIX in order to be portable to linux */ + +extern int verbose; +int backup = FALSE; +char *pattern_baseDir = NULL; + +/* --------- for syncing all files with/without backup. */ +int nPattern=0; /* number of regular expression for backup files */ +regex_t ** fPatterns;/* array of pointers to regular expression */ + +int set_nPattern(char * fpat_file) +{ + FILE *fd; + char pat[PATH_MAX]; + int count = 0; + if ((fd = fopen(fpat_file, "r"))==NULL) { + fprintf(stderr, "Cannot open file -- %s\n", fpat_file); + return FAIL; + } + while (!feof(fd)) { + if (fscanf(fd, "%s", pat) == 1) { + ++count; + } + } + nPattern = count; + fclose(fd); + return SUCCESS; +} + +int read_backup_pattern(char * fpat_file) +{ + FILE *fd; + char pat[PATH_MAX]; + int i = 0; + + if (!set_nPattern(fpat_file)) return FAIL; + + fPatterns = malloc(sizeof(void *)*nPattern); + for(i = 0; i< nPattern; ++i) { + fPatterns[i] = (regex_t *) malloc(sizeof(regex_t)); + } + + fd = fopen(fpat_file, "r"); + + /* + if we don't prepend ^, + then pattern 'file' intended for files under srcBase + will select files with pattern = subdir/file.* + which is not our intention. + */ + i = 0; + while (!feof(fd)) { + if (fscanf(fd, "%s", pat) == 1) { + char fullpat[PATH_MAX]; + sprintf(fullpat, "^%s", pat); + + regcomp(fPatterns[i], fullpat, REG_EXTENDED|REG_NOSUB); + ++i; + } + }; + fclose(fd); + return SUCCESS; +} + +int needBackup(char * filename) /* fullpath */ +{ + /* we reach this point when the backup flag is true + if no_pattern -> backup all of them + if pattern + if match -> backup + nomatch -> no-backup + */ + int i; + char *p; + if (!backup) return FALSE; + if (nPattern==0) return TRUE; + + p = filename + strlen(pattern_baseDir) + 1; /* +1 to get pass the / after basedir */ + /* fprintf(stderr, "nPattern = %d file= %s\n", nPattern, p); ********/ + for(i=0; i<nPattern; ++i) { + if (regexec(fPatterns[i], p, (size_t)0, NULL, 0)!=0) + continue; /* no match */ + return TRUE; + } + return FALSE; +} + diff --git a/cmdToTarget.py b/cmdToTarget.py new file mode 100755 index 0000000..ccf042a --- /dev/null +++ b/cmdToTarget.py @@ -0,0 +1,43 @@ + +# send cmds to a target machine: +# docmd(machine, cmd) +# return (1, output) when succeeds, (0, []) when fails + +import os, time, popen2, tempfile; + +secsForWait = 30; +def waitForJob(p, machine): + count = 0; + while (p.poll()): + time.sleep(1); + count += 1; + if count == secsForWait: + killJobs = "kill -9 `ps -ef | grep %s | egrep -v 'grep|python' "\ + "| awk '{ print $2}'` 2>/dev/null" % machine; + os.system(killJobs); + return 0; + return 1; + +def docmd(rsh, machine, cmd): + # if scheme tmpfile is not used, p will fail to return if the + # output of cmd is large + tmpFile = tempfile.mktemp(); + p = popen2.Popen3("%s %s '%s' > %s 2>/dev/null" % (rsh, machine, cmd, tmpFile)); + + if not waitForJob(p, machine): return (0, []); + + lines = open(tmpFile).readlines(); + os.remove(tmpFile); + return (1, lines); + +# check if the machine can be talked to by rsh +def isMachineOK(rsh, machine): + p = popen2.Popen3("%s %s 'date 2>/dev/null'" % (rsh, machine)); + + if not waitForJob(p, machine): return False; + + lists = p.fromchild.readlines(); + + if len(lists)==0: return False; + return True; + diff --git a/complaint_sender.c b/complaint_sender.c new file mode 100644 index 0000000..53cfdb4 --- /dev/null +++ b/complaint_sender.c @@ -0,0 +1,158 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +/* complaint_send_socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; +extern int verbose; + +int seq = 0; + +/* send buffer */ +char complaint_buffer[FLOW_BUFFSIZE]; +int *ccode_ptr; /* complain code ---- see main.h */ +int *cmid_ptr; /* which machine*/ +int *cfile_ptr; /* which file -- for missing page */ +int *npage_ptr; /* # of pages -- for missing page */ +int *pArray_ptr; /* missing page arrary */ +int *fill_ptr; /* point to next array element */ + +/* ---------------------------------------------------------------- + routines to fill in pArray with the missing page indexes + --------------------------------------------------------------- */ +void fill_in_int(int i) +{ + *fill_ptr++ = htonl(i); +} + +void init_fill_ptr() +{ + fill_ptr = pArray_ptr; +} + +/*---------------------------------------------------------- + init_complaint_sender initializes the buffer to allow the + catcher to send complaints back to the sender. + + ret_address of sender to whom we will complain + is determined when we receive the first UDP data + in read_handle_page() in page_reader.c + ----------------------------------------------------------*/ +void init_complaint_sender() /* (struct sockaddr_in *ret_addr) */ +{ + /* ret_addr is sent by master, in network-byte-order */ + if (verbose>=2) + fprintf(stderr, "in init_complaint_sender\n"); + /* init the send_socket */ + complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT); + + /* set up the pointers so we know where to put complaint_data */ + ccode_ptr = (int *) complaint_buffer; + cmid_ptr = (int *)(ccode_ptr + 1); + cfile_ptr = (int *)(cmid_ptr + 1); + npage_ptr = (int *)(cfile_ptr + 1); + pArray_ptr= (int *)(npage_ptr + 1); +} + +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin_addr); +} +#else +void update_complaint_address(struct sockaddr_in6 *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin6_addr); +} +#endif + +/*------------------------------------------------------------------------ + send_complaint fills the complaint buffer and send it through our socket + back to the sender + + The major use is to tell master machine which pages of which file + needs to be re-transmitted. + complaint -- the complain code defined in main.h + mid -- machine id + file -- the file index + npage -- # of missing pages + followed by an array of missing page index [ page_1, page_2, ... ] + + It is also used for sending back acknoledgement. + complaint -- the ack code defined in main.h in the same complaint section. + mid -- machine id + file -- which file + page -- seq number (out of seq complaints will be ignored by the catcher) + ------------------------------------------------------------------------*/ +void send_complaint(int complaint, int mid, int page, int file) +{ + /* fill in the complaint data */ + /* 20060323 add converting to network byte-order before sending out */ + int bytes; + *ccode_ptr = htonl(complaint); + *cmid_ptr = htonl(mid); + *cfile_ptr = htonl(file); + if (complaint==MISSING_PAGE || complaint==MISSING_TOTAL) { + *npage_ptr = htonl(page); + } else { + *npage_ptr = htonl(seq++); + } + + bytes = (complaint==MISSING_PAGE) ? ((char*)fill_ptr - (char*)ccode_ptr) + : (char*)pArray_ptr - (char*)ccode_ptr; + + /* send it */ + if(sendto(complaint_fd, complaint_buffer, bytes, 0, + (const struct sockaddr *)&complaint_addr, + sizeof(complaint_addr)) < 0) { + perror("Sending complaint\n"); + } + if (verbose>=2) + printf("Sent complaint:code=%d mid=%d page=%d file=%d bytes=%d\n", + complaint, mid, page, file, bytes); +} + + + + + + + + diff --git a/complaints.c b/complaints.c new file mode 100644 index 0000000..fb48da2 --- /dev/null +++ b/complaints.c @@ -0,0 +1,579 @@ +/* + Copyright (C) 2005-2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +/*#include "paths.h"*/ +#include <sys/types.h> +#include <time.h> +#include <setjmp.h> +#include <signal.h> + +extern int monitorID; /* defined in multicaster.c */ +extern int my_FLOW_PORT; +extern int verbose; +extern int nPages; +extern char * cmd_name[]; +extern unsigned int total_pages, real_total_pages; +extern off_t total_bytes, real_total_bytes; + +/* buffer for receiving complaints */ +char flow_buff[FLOW_BUFFSIZE]; +int *code_ptr; /* What's wrong? */ +int *mid_ptr; /* machine id */ +int *file_ptr; /* which file */ +int *npage_ptr; /* # of pages */ +int *pArray_ptr;/* missing page arrary */ + +/* receive socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +/* status */ +char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */ +int *last_seq; /* array of size nMachines -- seq number of complaints */ + /* *** watch out the max seq = 2e9 */ +int *total_missing_page; /* array of size nMachines -- persistent thru life of program*/ +char *file_received; /* array of size nMachines */ +char *bad_machines; /* array of size nMachines -- persistent thru life of program*/ +char *machine_status; /* array of size nMachines for ack */ +int *missing_pages; /* array of size nMachines */ + +int nMachines; +int has_missing; /* some machines have missing pages for this file (a flag)*/ +int has_sick; /* some machines are sick for this file (a flag)*/ +int skip_count=0; /* number of files that are not delivered */ +int quitWithOneBad=FALSE; /* default: continue with one or more (<all) bad machine */ + +/* flow control */ +int my_ACK_WAIT_TIMES = ACK_WAIT_TIMES; + +/* + init_complaints initializes our buffers to receive complaint information + from the catchers +*/ +void init_complaints() +{ + int rcv_size; + + if (verbose>=2) + fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE); + + /* get pointers set to the right place in buffer */ + code_ptr = (int *) flow_buff; + mid_ptr = (int *) (code_ptr + 1); + file_ptr = (int *) (mid_ptr + 1); + npage_ptr = (int *) (file_ptr + 1); + pArray_ptr= (int *) (npage_ptr+1); + + /* Receive socket (the default buffer size is 65535 bytes */ + if (verbose>=2) printf("set up receive socket for complaints\n"); + complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT); + + /* + getsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &i, &il); + printf(" rcvbuf = %d type = %d\n", i, il); + exit(0); + the default in our machines -> size = 65535 and type = 4 + */ + + rcv_size = TOTAL_REC_PAGE * FLOW_BUFFSIZE; + if (setsockopt(complaint_fd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer for init_complaints"); + } +} + +void init_missing_page_flag(int n) +{ + int i; + nPages = n; + if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) { + fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n); + perror("error = "); + exit(0); + } + for(i=0; i<nPages; ++i) { + missing_page_flag[i] = RECEIVED; + } +} + +void page_sent(int page) +{ + missing_page_flag[page] = RECEIVED; +} + +void free_missing_page_flag() +{ + free(missing_page_flag); + missing_page_flag = NULL; +} + +void set_has_missing() +{ + has_missing = TRUE; +} + +void reset_has_missing() +{ + has_missing = FALSE; +} + +void set_has_sick() +{ + has_sick = TRUE; +} + +void reset_has_sick() +{ + has_sick = FALSE; +} + +int has_sick_machines() +{ + return has_sick; +} + +int has_missing_pages() +{ /* originally, this fx use missing_page_flag[] + That will not work if the master did not receive missing page info */ + return has_missing; +} + +void refresh_missing_pages() +{ + int i; + for(i=0; i<nMachines; ++i) missing_pages[i] = 0; +} + +void refresh_machine_status() +{ + int i; + for(i=0; i<nMachines; ++i) machine_status[i] = NOT_READY; +} + +void mod_machine_status() +{ + /* take into account of machines which have received this file */ + int i; + for(i=0; i<nMachines; ++i) { + if (file_received[i]==FILE_RECV) machine_status[i]=MACHINE_OK; + } +} + +void refresh_file_received() +{ + int i; + for(i=0; i<nMachines; ++i) file_received[i] = NOT_RECV; +} + +int nNotRecv() +{ + int i, c; + c = 0; + for(i=0; i<nMachines; ++i) { + if (file_received[i] == NOT_RECV) ++c; + } + return c; +} + +int iNotRecv() +{ + /* find the first NotRecv machine */ + int i; + for(i=0; i<nMachines; ++i) { + if (file_received[i] == NOT_RECV) return i; + } + return -1; +} + +void init_machine_status(int n) +{ + int i; + nMachines = n; + machine_status = malloc(n * sizeof(char)); + refresh_machine_status(); + + bad_machines = malloc(n * sizeof(char)); + for(i=0; i<nMachines; ++i) { + bad_machines[i] = GOOD_MACHINE; + } + + total_missing_page = malloc(n * sizeof(int)); + for(i=0; i<nMachines; ++i) { + total_missing_page[i] = 0; + } + + missing_pages = malloc(n * sizeof(int)); + for(i=0; i<nMachines; ++i) { + missing_pages[i] = 0; + } + + file_received = malloc(n * sizeof(char)); + refresh_file_received(); + + skip_count = 0; + + last_seq = malloc(n * sizeof(int)); + for(i=0; i<nMachines; ++i) { + last_seq[i] = -1; + } +} + +int get_total_missing_pages(int n) +{ + return total_missing_page[n]; +} + +int read_handle_complaint(int cmd) +{ + /* + cmd = cmd_code -- each cmd expects different 'response' (complaints) + return 1 for complaint handled + return 0 for irrelevant complaint + return -1 for time-out + */ + int mid_v, code_v, file_v, npage_v, bytes_read; + + if (readable(complaint_fd)) { + /* There is a complaint */ + bytes_read = recvfrom(complaint_fd, flow_buff, FLOW_BUFFSIZE, 0, NULL, NULL); + + /* 20060323 deal with big- vs little-endian issue + convert incoming integers into host representation */ + + mid_v = ntohl(*mid_ptr); + + if ((bytes_read < FLOW_HEAD_SIZE) || (mid_v < 0 ) || + (mid_v >= (unsigned int) nMachines) || /* boundary check for mid_v for safety */ + (bad_machines[mid_v] == BAD_MACHINE)) { /* ignore complaint from a bad machine*/ + return 0; + } + + code_v = ntohl(*code_ptr); + file_v = ntohl(*file_ptr); + npage_v = ntohl(*npage_ptr); + + /* check if the complaint is for the current file */ + if (code_v != MONITOR_OK && file_v != current_entry()) return 0; + /* out of seq will be ignored */ + if (code_v != MISSING_PAGE && code_v != MISSING_TOTAL) { /********* MISSING_TOTAL ? *************/ + if (npage_v <= last_seq[mid_v]) return 0; + else last_seq[mid_v] = npage_v; + } + + switch (code_v) { + case PAGE_RECV: + /******** check if machineID is the one we have set. */ + /*if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID);*/ + if (cmd == SENDING_DATA && mid_v == monitorID) + return 1; + else + return 0; + + case MONITOR_OK: + /********* check if machineID is the one we have set. */ + if (verbose>=2) fprintf(stderr, "mid_ptr-> %d, monitorid = %d\n", mid_v, monitorID); + if (cmd == SELECT_MONITOR_CMD && mid_v == monitorID) + return 1; + else + return 0; + + case OPEN_OK : + if (cmd == OPEN_FILE_CMD) { + machine_status[mid_v] = MACHINE_OK; + return 1; + } else { + return 0; + } + + case CLOSE_OK : + if (cmd == CLOSE_FILE_CMD || cmd == CLOSE_ABORT_CMD) { + machine_status[mid_v] = MACHINE_OK; + return 1; + } else { + return 0; + } + + case EOF_OK : + if (cmd == EOF_CMD && file_received[mid_v]==NOT_RECV) { + machine_status[mid_v] = MACHINE_OK; + file_received[mid_v] = FILE_RECV; + return 1; + } else { + return 0; + } + + case MISSING_PAGE : + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0; + if (npage_v > nPages) return 0; + { + int i, *pi, page_v; + pi = pArray_ptr; + for (i = 0; i<npage_v; ++i) { + page_v = ntohl(pi[i]); + if (page_v<1 || page_v > nPages) continue; /*** make sure page_v starts with 1*/ + missing_page_flag[page_v-1] = MISSING; + } + } + missing_pages[mid_v] += npage_v; + set_has_missing(); + return 1; + + case MISSING_TOTAL: + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV || machine_status[mid_v] == MACHINE_OK) + return 0; + /* Consider to add: if npage_v >missing_pages[mid_v], ask to resend + [ likely no big gain ] */ + total_missing_page[mid_v] += npage_v; + set_has_missing(); /* store the info about missing info */ + machine_status[mid_v] = MACHINE_OK; /* machine_status serves as ack only */ + return 1; + + case SIT_OUT : + if (cmd != EOF_CMD || file_received[mid_v]==FILE_RECV) return 0; + fprintf(stderr, "*** %s sits-out-receiving %s\n", + id2name(mid_v), getFilename()); + machine_status[mid_v] = MACHINE_OK; + + if (!has_sick) ++skip_count; + set_has_sick(); + return 1; + + default : + if (verbose>=2) fprintf(stderr, "Unknown complaint: code = %d\n", code_v); + return 0; + } /* end of switch */ + } /* end of if(readable) */ + + /* time out of readable() */ + return -1; +} + +int all_machine_ok() +{ + int i; + for(i=0; i<nMachines; ++i) { + if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) + return FALSE; /* there is at least one machine that has not sent ack */ + } + return TRUE; /* all machines are ready */ +} + +/* this is for the master to receive the acknowledgement. */ +void wait_for_ok(int code) +{ + int i, count, resp; + time_t tloc; + time_t rtime0, rtime1; + + rtime0 = time(&tloc); /* reference time */ + + count = 0; + while (!all_machine_ok()) { + resp = read_handle_complaint(code); + if (resp==1) { /* if there is a complaint handled */ + rtime0 = time(&tloc); /* reset the reference time */ + continue; + } + + if (resp==0) { /* irrelevant complaint received */ + continue; + } + + /* no complaints handled within the time period set by set_delay() */ + rtime1 = time(&tloc); /* time since last complaints */ + if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) { + ++count; + if (count < my_ACK_WAIT_TIMES) { + if (verbose>=1 && (count % 10 == 0)) + fprintf(stderr, " %d: resend cmd(%s) to machines:[ ", count, cmd_name[code]); + for(i=0; i<nMachines; ++i) { + if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) { + if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "%d ", i); + send_cmd(code, (int) i); + usleep(FAST); + } + } + if (verbose>=1 && (count % 10 == 0)) fprintf(stderr, "]\n"); + rtime0 = rtime1; + } else { /* allowable period of time has passed */ + fprintf(stderr, " Drop these bad machines:[ "); + for(i=0; i<nMachines; ++i) { + if (machine_status[i] == NOT_READY && bad_machines[i] == GOOD_MACHINE) { + /* fprintf(stderr, "%d ", i); */ + fprintf(stderr, "%s ", id2name(i)); + bad_machines[i]= BAD_MACHINE; + /* + The pages sent by the master will be ignored by + the bad machine, because the current_file nubmer + does not match. + */ + } + } + fprintf(stderr, "]\n"); + break; + } + } + } +} + +int is_it_missing(int page) +{ + return (missing_page_flag[page]==MISSING) ? TRUE : FALSE; +} + +int pr_missing_pages() +{ + int i, N, exit_code=0; + off_t delta; + unsigned int dp; + + for(i=0; i<nMachines; ++i) { + char name[PATH_MAX]; + N = get_total_missing_pages(i); + strcpy(name, id2name(i)); + if (strlen(name)==0) sprintf(name, "machine(%3d)", i); + fprintf(stderr, "%s: #_missing_page_request = %6.2f%% = %d\n", + name, (double)N/((double)total_pages)*100.0, N); + } + + if (skip_count>0) { + fprintf(stderr, "\nWarning: There are %d files which are not delivered.\n", skip_count); + exit_code = -1; + } + + fprintf(stderr, "\nTotal number of files = %12d Pages w/o ack = %12u (%6.2f%%)\n", + total_entries(), pages_wo_ack(), (double)pages_wo_ack()/(double)real_total_pages*100.0); + + dp = real_total_pages - total_pages; + fprintf(stderr, "Total number of pages = %12d Pages re-sent = %12u (%6.2f%%)\n", + total_pages, dp, (double)dp/(double)total_pages*100.0); + + delta = (off_t)(real_total_bytes - total_bytes); + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "Total number of bytes = %12llu Bytes re-sent = %12llu (%6.2f%%)\n", + total_bytes, delta, (double)delta/(double)total_bytes*100.0); + #else + fprintf(stderr, "Total number of bytes = %12d Bytes re-sent = %12u (%6.2f%%)\n", + total_bytes, delta, (double)delta/(double)total_bytes*100.0); + #endif + + return (exit_code); +} + +/* count the number of bad machines */ +int nBadMachines() +{ + int i, count = 0; + for(i=0; i<nMachines; ++i) { + if (bad_machines[i] == BAD_MACHINE) ++count; + } + + return count; +} + +int choose_print_machines(char *stArray, char selection, char * msg_prefix) +{ + int i, count = 0; + + for(i=0; i<nMachines; ++i) { + char line[PATH_MAX]; + if (stArray[i] == selection) { + ++count; + strcpy(line, id2name(i)); + if (count == 1) { fprintf(stderr, msg_prefix); } + if (strlen(line)==0) + fprintf(stderr, "%d ", i); + else + fprintf(stderr, "%s ", line); + } + } + if (count > 0) fprintf(stderr, "]\n"); + return count; +} + +int send_done_and_pr_msgs(double total_time, double t_page) +{ + int exit_code1 =0; + int exit_code2 =0; + int exit_code3 =0; + + send_all_done_cmd(); + + /* exit_code1 !=0 if there are files that were not delivered due to change or skipped */ + exit_code1 = pr_missing_pages(); + + fprintf(stderr, "Total time spent = %6.2f (min) ~ %6.2f (min/GB)\n\n", + total_time, total_time / ((double)real_total_bytes/1.0e9)); + fprintf(stderr, "Send pages time = %6.2f (min) ~ %6.2f (min/GB)\n\n", + t_page, t_page / ((double)real_total_bytes/1.0e9)); + + exit_code2 = choose_print_machines(bad_machines, + BAD_MACHINE, + "Not synced for bad machines:[ "); + + if (quitWithOneBad && nBadMachines() >=1) { + fprintf(stderr, "We choose to exit when at least one target is bad\n"); + fprintf(stderr, "All files following the current one did not get delivered\n"); + fprintf(stderr, "If resend cmd(CLOSE_FILE), then the current file may have been delivered to non-bad targets\n\n"); + } + + if (current_entry() < total_entries()) { /* if we exit prematurely */ + exit_code3 = choose_print_machines(machine_status, + NOT_READY, "\nNot-ready machines:[ "); + } + + if (verbose>=1) pr_rtt_hist(); + return (exit_code1+exit_code3); /* 200807 removed exit_code2 because bad machines case has been dealt with + by -q. If no -q, then the bad machines are considered 'harmless' */ +} + +/* to do some cleanup before exit IF all machines are bad */ +void do_badMachines_exit() +{ + if ((quitWithOneBad && nBadMachines() < 1) || + (!quitWithOneBad && (nBadMachines() < nMachines))) return; + + if (quitWithOneBad) + fprintf(stderr, "One (or more) machine is bad. Exit!\n"); + else + fprintf(stderr, "All machines are bad. Exit!\n"); + + send_done_and_pr_msgs(-1.0, -1.0); + exit(-1); +} + +void do_cntl_c(int signo) +{ + fprintf(stderr, "Control_C interrupt detected!\n"); + + send_done_and_pr_msgs(-1.0, -1.0); + exit(-1); +} diff --git a/file_operations.c b/file_operations.c new file mode 100644 index 0000000..d9a8733 --- /dev/null +++ b/file_operations.c @@ -0,0 +1,800 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file contains all the file-operations for multicatcher. + 20060404: + found a undetected omission in open_file(). + AFter lseek(), we should write a dummy byte so that + multicatcher.zzz has the right file size to start with. + Otherwise, it will grow as syncing progresses. + + Port the code to deal with Large_files. + esp in write_page(), + lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET) + 200603: + Remove the meta-data operation. + Each file's info (stat) is transfered to targets during + the OPEN_FILE_CMD. extract_file_info() in this file + is to get that stat info for the current entry(file). + + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Previously, memory mapped file was used for file IO + but later was changed to simple open() and lseek(), write(). + This was also echoed in a patch by Clint Byrum <clint@careercast.com>. + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was originally called wish_list.c + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include <libgen.h> +#include "main.h" + +extern int verbose; +extern int machineID; + +char * baseDir = NULL; +char * missingPages=NULL; /* starting address of the array of flags */ +int fout; /* file discriptor for output file */ + +int toRmFirst = FALSE; /* remove existing file first and then sync */ +/* off_t total_bytes_written;*/ +unsigned int nPages; +int had_done_zero_page; +int current_file_id = 0; /* 1, 2, 3 ... or -1, -2 ... for backup */ +int backup; /* flag for a file that needs backup when deletion */ +char *backup_suffix = NULL; +char my_backup_suffix[] = "_mcast_bakmmddhhmm"; + +/* file stat_info which is transmitted from master */ +mode_t stat_mode; +nlink_t stat_nlink; +uid_t stat_uid; +gid_t stat_gid; +off_t stat_size; +time_t stat_atime; +time_t stat_mtime; + +char filename[PATH_MAX]; +char linktar[PATH_MAX]; +char fullpath[PATH_MAX]; +char tmp_suffix[L_tmpnam]; + +void default_suffix() +{ + time_t t; + struct tm tm; + time(&t); + localtime_r(&t, &tm); + sprintf(my_backup_suffix, "_mcast_bak%02d%02d%02d%02d", + tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min); + backup_suffix = my_backup_suffix; + return; +} + +void get_tmp_suffix() +{ + /* this is called once in each multicatcher */ + char tmp[L_tmpnam]; + tmpnam_r(&tmp[0]); + strcpy(tmp_suffix, basename(tmp)); +} + +int make_backup() +{ + char fnamebak[PATH_MAX]; + if (strlen(fullpath) + strlen(backup_suffix) > (PATH_MAX-1)) { + fprintf(stderr, "backup filename too long\n"); + return FAIL; + }; + + if (!backup) return SUCCESS; /* if not match with pattern, skip the backup */ + + sprintf(fnamebak, "%s%s", fullpath, backup_suffix); + /* + The backup scheme is as follows. + ln file file.bak (mv file file.bak would cause file to non-exist for a short while) + mv file.new file + */ + if (link(fullpath, fnamebak) != 0) { + if (errno != ENOENT && errno != EINVAL) { + fprintf(stderr,"hardlink %s => %s : %s\n",fullpath, fnamebak, strerror(errno)); + return FAIL; + } + } + if (verbose >= 2) { + fprintf(stderr, "backed up %s to %s\n",fullpath, fnamebak); + } + return SUCCESS; +} + +void get_full_path(char * dest, char * sub_path) +{ + /* prepend the sub_path with baseDir --> dest */ + strcpy(dest, baseDir); + strcat(dest, "/"); + strcat(dest, sub_path); +} + +void get_tmp_file(char * tmp) +{ + /*char *fncopy;*/ + /* ******* change to filename_mcast.fileabc%$&? */ + /* fncopy = strdup(filename); dirname change the string content */ + strcpy(tmp, baseDir); + strcat(tmp, "/"); + strcat(tmp, filename); + strcat(tmp, "_"); + strcat(tmp, TMP_FILE); + strcat(tmp, tmp_suffix); + /* free(fncopy); */ +} + +int my_unlink(const char *fn) +{ + if (verbose>=2) + fprintf(stderr, "deleting file: %s\n", fn); + if (unlink(fn) != 0) { + if (errno==ENOENT) { + return SUCCESS; + } else { + /* NOTE: unlink() could not remove files which do not have w permission!*/ + /* resort to shell command */ + char cmd[PATH_MAX]; + sprintf(cmd, "rm -f %s", fn); + if (system(cmd)!=0) { + fprintf(stderr, "'rm -f' fails for %s\n", fn); + return FAIL; + } + } + } + return SUCCESS; +} + +int my_touch(const char*fn) +{ + int fo; + if( (fo = open(fn, O_RDWR | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) { + perror(fn); + return FAIL; + } + + /* set the size of the output file */ + if(lseek(fo, 0, SEEK_SET) == -1) { + fprintf(stderr, "cannot seek for %s\n", fn); + perror(fn); + return FAIL; + } + close(fo); + return SUCCESS; +} + +int extract_file_info(char * buf, int n_file, unsigned int n_pages) +{ + /* + Along with OPEN_FILE_CMD, the data area in rec_buf contains + (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0 + where the stat_string contains the buf in + sprintf(buf, "%lu %lu %lu %lu %lu %lu %lu", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime); + */ + char * pc = &buf[0]; + + #ifdef _LARGEFILE_SOURCE + if (sscanf(pc, "%u %u %u %u %llu %lu %lu %d", &stat_mode, &stat_nlink, + &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime, + &toRmFirst) != 8) + return FAIL; + #else + if (sscanf(pc, "%u %u %u %u %lu %lu %lu %d", &stat_mode, &stat_nlink, + &stat_uid, &stat_gid, &stat_size, &stat_atime, &stat_mtime, + &toRmFirst) != 8) + return FAIL; + #endif + + /* fprintf(stderr, "size= %llu\n", stat_size); *********/ + + pc += (strlen(pc) + 1); + strcpy(filename, pc); + get_full_path(fullpath, filename); + + linktar[0] = '\0'; + if (S_ISLNK(stat_mode) || stat_nlink > 1) { /* if it is a softlink or hardlink */ + pc += (strlen(pc) +1); + strcpy(linktar, pc); + } + + nPages = n_pages; + current_file_id = n_file; + backup = (current_file_id < 0); + had_done_zero_page = FAIL; + /*total_bytes_written = 0;*/ + return SUCCESS; +} + +int open_file() +{ + int i; + + /* + sometimes for disk space reason, it is necessary + to first remove the file and sync. + If toReplace is true, the backup option should be off. + */ + if (toRmFirst) { + if (!my_unlink(fullpath)) { + fprintf(stderr, "Replacing "); + perror(filename); + return FAIL; + } + } + + /* fprintf(stderr, "%d %d %d\n", stat_mode, stat_nlink, stat_size); *********/ + if (S_ISREG(stat_mode) && stat_nlink == 1) { + /* if it is a regular file and not a hardlink */ + char tmpFile[PATH_MAX]; + get_tmp_file(tmpFile); + + my_unlink(tmpFile); /* make sure it's not there from previous runs */ + + if( (fout = open(tmpFile, O_RDWR | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) < 0) { + fprintf(stderr, "cannot open() %s for writing. \n", tmpFile); + perror(tmpFile); + return FAIL; + } + + /* set the size of the output file (see Steve's book on page 411) */ + if(lseek(fout, stat_size - 1 , SEEK_SET) == -1) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "lseek() error for %s with size = %llu\n", tmpFile, stat_size); + #else + fprintf(stderr, "lseek() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size); + #endif + perror(tmpFile); + close(fout); + return FAIL; + } + if (write(fout, "", 1) != 1) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "write() error for %s with size = %llu\n", tmpFile, stat_size); + #else + fprintf(stderr, "write() error for %s with size = %u\n", tmpFile, (unsigned int)stat_size); + #endif + perror(tmpFile); + close(fout); + return FAIL; + } + } + + /* init missingPages flags */ + if (!missingPages) free(missingPages); + missingPages = malloc(sizeof(char) * nPages); + for(i=0; i < nPages; ++i) missingPages[i] = MISSING; + + if (verbose>=2) fprintf(stderr, "Ready to receive file = %s\n", filename); + return SUCCESS; +} + +int close_file() +{ + /* delete missingPages flags which was malloc-ed in open_file() */ + free(missingPages); + missingPages = NULL; + + if (S_ISREG(stat_mode) && stat_nlink == 1) { + /* if it is a regular file and not a hardlink */ + char tmpFile[PATH_MAX]; + struct stat stat; + + if ((fout != -1) && (close(fout) != 0)) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n", + filename, stat_size); + #else + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n", + filename, (unsigned int)stat_size); + #endif + perror("close"); + return FAIL; + } + fout = -1; /* if the following fails, the reentry wont do munmap() */ + + /* the real work */ + /* get_full_path(oldFile, filename); **** unnecessary fullpath is the oldFile */ + + get_tmp_file(tmpFile); + + /* 8/14/2002 + If there was a hardware (disk IO) problem + the sync should not proceed. + ** Add the following checking. + */ + if (lstat(tmpFile, &stat)<0) { + perror("ERROR: close_file() cannot lstat the tmp file\n"); + return FAIL; + } + + if (backup && !make_backup(fullpath)) { /* make a hardlink oldFile => backup_file */ + fprintf(stderr, "fail to make backup for %s\n", fullpath); + return FAIL; + } + if (rename(tmpFile, fullpath)<0) { + perror("ERROR: close_file():rename() \n"); + return FAIL; + } + + /* 20071016 in rare occasion, the written file has not the right size */ + if (lstat(fullpath, &stat)<0) { + fprintf(stderr, "ERROR: close_file() cannot lstat %s\n", fullpath); + return FAIL; + } + if (stat_size != stat.st_size) { + fprintf(stderr, "ERROR: close_file() filesize != incoming-size\n"); + return FAIL; + } + } + + /* for debug + if (verbose>=2) fprintf(stderr, "total bytes written %llu for file %s\n", + total_bytes_written, filename); + */ + + return SUCCESS; +} + +int rm_tmp_file() +{ + char tmpFile[PATH_MAX]; + + if ((fout != -1) && (close(fout) != 0)) { + #ifdef _LARGEFILE_SOURCE + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %llu \n", + filename, stat_size); + #else + fprintf(stderr, "ERROR: Cannot close() tmp for -- %s size= %u \n", + filename, (unsigned int)stat_size); + #endif + perror("rm_tmp"); + return FAIL; + } + fout = -1; + + get_tmp_file(tmpFile); + + return (my_unlink(tmpFile)); +} + +int nPages_for_file() +{ + return nPages; +}; + +/* return total number of missing pages */ +int get_missing_pages() +{ + int i, result=0; + + for(i=0; i < nPages; ++i) + if ((missingPages[i]) == MISSING) ++result; + return result; +} + +int is_missing(int index) +{ + return (missingPages[index] == MISSING) ? TRUE : FALSE; +} + +void page_received(int index) +{ + missingPages[index] = RECEIVED; +} + +/* + write() in write_page() may block forever. + This function is to check if write() is ready. +*/ +int writable(int fd) +{ + struct timeval write_tv; + fd_set wset; + FD_ZERO(&wset); + FD_SET(fd, &wset); + + write_tv.tv_sec = WRITE_WAIT_SEC; + write_tv.tv_usec = WRITE_WAIT_USEC; + return (select(fd + 1, NULL, &wset, NULL, &write_tv)==1); +} + +void write_page(int page, char *data_ptr, int bytes) +{ + /* page = page number starting with 1 */ + if (page < 1 || page > nPages) return; + + /* Do we need to write this page? */ + if (is_missing(page-1)){ + if (!writable(fout)) return; + + /* Write the data */ + if (lseek(fout, (off_t)(page-1)*(off_t)PAGE_SIZE, SEEK_SET)<0) { + if (verbose>=1) { + fprintf(stderr, "ERROR: write_page():lseek() at page %d for %s\n", + page, filename); + perror("ERROR"); + } + return; + } + if (write(fout, data_ptr, bytes)<0) { + /* write IO error !!! */ + perror("ERROR"); + fprintf(stderr, "write_page():write() error: at page %d for %s\n", page, filename); + return; + } + + /* Mark the page as received in our wish list */ + page_received(page-1); + /*total_bytes_written += bytes;*/ + } else { + /* If we don't need to write it, just return */ + if (verbose >=2) { + fprintf(stderr, "Already have page %d for %d:Ignoring\n", page, current_file_id); + } + } + return; +} + +/* For files whose size is 0 */ +int touch_file() +{ + if (verbose >=2) + fprintf(stderr, "touching file: %s\n", fullpath); + + my_unlink(fullpath); + return my_touch(fullpath); + /* system() + VERY time in-efficient + char cmd[PATH_MAX]; + sprintf(cmd, "touch %s", fullpath); + return (system(cmd)==0); + */ +} + +int delete_file(int to_check_dir_type) +{ + struct stat st; + char fp[PATH_MAX]; + int trailing_slash; + int type_checking; + + strcpy(fp, fullpath); + + /* remove trailing slash if any -- for deletion-'type' checking */ + if (to_check_dir_type) { + char *pc; + type_checking = TRUE; + pc = &fp[0] + strlen(fp) - 1; + if (*pc=='/') { + *pc = '\0'; + trailing_slash = TRUE; + } else { + trailing_slash = FALSE; + }; + } else { + type_checking = FALSE; + } + + if(lstat(fp, &st) < 0) { + /* already gone ? */ + return SUCCESS; + } + if (S_ISREG(st.st_mode) || S_ISLNK(st.st_mode)) { /* delete a file or link */ + if (verbose>=2) + fprintf(stderr, "deleting file: %s\n", fp); + + if (type_checking && trailing_slash) { /* intended to remove a directory when it is not */ + return FAIL; + } + + if (backup && S_ISREG(st.st_mode) && !make_backup(fp)) {/* backup regular file */ + return FAIL; /* failed to make_backup */ + } + return (my_unlink(fp)); + } else if (S_ISDIR(st.st_mode)) { /* remove a directory */ + char cmd[PATH_MAX]; + if (verbose>=2) + fprintf(stderr, "deleting directory: %s\n", fp); + + if (type_checking && (!trailing_slash)) { /* intended to remove a non-dir when it is directory */ + return FAIL; + } + + sprintf(cmd, "rm -rf %s", fp); /* remove everything in dir, watch out for this */ + return (system(cmd)==0); + } + /* not file, link, directory */ + fprintf(stderr, "unrecognized file_mode for %s\n", fp); + return FAIL; +} + +/* send complaints to the master for missing data */ +int ask_for_missing_page() +{ + int i, n=0, total=0; + + /* + Send missing page indexes if any + */ + init_fill_ptr(); + for(i=0; i < nPages; ++i) { + if (missingPages[i] == MISSING ) { + ++n; + ++total; + if (n > MAX_NPAGE) { + /* send previous missing page-indexes */ + send_complaint(MISSING_PAGE, machineID, MAX_NPAGE, current_file_id); + init_fill_ptr(); + n = 1; + } + /* fill in one page index */ + fill_in_int(i+1); /* origin = 1 */ + } + } + /* send the rest of missing pages complaint */ + if (n>0) send_complaint(MISSING_PAGE, machineID, n, current_file_id); + + return total; /* there are missing pages */ +} + +void missing_page_stat() +{ + int i, n=0, sum=0, last=-1; + + for(i=0; i < nPages; ++i) { + if (missingPages[i] == MISSING ) { + ++n; + if (last<0) { + sum += i; + } else { + sum += (i - last); + } + last = i; + } + } + if (n>0) { + double a = sum; + double b = n; + fprintf(stderr, "file= %d miss= %d out-of %d avg(delta_index) = %f\n", + current_file_id, n, nPages, a/b); + } +} + +void my_perror(char * msg) +{ + char fn[PATH_MAX]; + sprintf(fn, "%s - %s", fullpath, msg); + perror(fn); +} + +int set_owner_perm_times() +{ + int state = SUCCESS; + + /* set owner */ + if (lchown(fullpath, stat_uid, stat_gid)!=0) { + my_perror("chown"); + state = FAIL; + } + + /* + set time and permission. + Don't try to set the time and permission on a link + */ + if (!S_ISLNK(stat_mode)) { + struct utimbuf times; + if (chmod(fullpath, stat_mode)!=0) { + my_perror("chmod"); + state = FAIL; + } + + times.actime = stat_atime; + times.modtime = stat_mtime; + if (utime(fullpath, ×)!=0) { + my_perror("utime"); + state = FAIL; + } + } + return state; +} + +int update_directory() +{ + struct stat st; + int exists; + char fp[PATH_MAX], *pc; + + if (verbose>=2) + fprintf(stderr, "Updating dir: %s\n", fullpath); + + /* if fullpath is a softlink that points to a dir, + and it has a trailing '/', + lstat() will view it as a directory ! + So, we remove the trailing '/' before lstat() */ + strcpy(fp, fullpath); + pc = &fp[0] + strlen(fp) - 1; + if (*pc=='/') *pc = '\0'; + + if(lstat(fp, &st) < 0) { + switch(errno) { + case ENOENT: + exists = FALSE; + break; + default: + my_perror("lstat"); + return FAIL; + } + } else { + exists = TRUE; + } + + if (!exists) { + /* There's nothing there, so create dir */ + if (mkdir(fp, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + } else if (!S_ISDIR(st.st_mode)) { + /* If not a directory delete what is there */ + if (unlink(fp)!=0){ + my_perror("unlink"); + return FAIL; + } + if (verbose>=2) + fprintf(stderr, "Deleted file %s to replace with directory\n", fp); + if (mkdir(fp, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + } else { + /* If dir exists, just chmod */ + chmod(fp, stat_mode); + /*** 20070410: changing mtime of a dir can cause NFS to confuse + See http://lists.samba.org/archive/rsync/2004-May/009439.html + So, I comment it out in the following + + struct utimbuf times; + times.actime = stat_atime; + times.modtime = stat_mtime; + ***/ + /* + if (utime(fullpath, ×) < 0) { + my_perror("utime"); + } + */ + return SUCCESS; + } +} + +int update_directory0() +{ + DIR *d; + + struct utimbuf times; + times.actime = stat_atime; + times.modtime = stat_mtime; + + if (verbose>=2) + fprintf(stderr, "Creating dir: %s\n", fullpath); + + d = opendir(fullpath); + if (d == NULL) { + switch(errno) { + case ENOTDIR: + /* If not a directory delete what is there */ + if (unlink(fullpath)!=0){ + my_perror("unlink"); + return FAIL; + } + if (verbose>=2) + fprintf(stderr, "Deleted file %s to replace with directory\n", fullpath); + /* Fall through to ENOENT */ + case ENOENT: + /* There's nothing there, so create dir */ + if (mkdir(fullpath, stat_mode) < 0){ + my_perror("mkdir"); + return FAIL; + } + return SUCCESS; + default: + my_perror("opendir"); + return FAIL; + } + } else { + /* If dir exists, just chmod */ + closedir(d); + chmod(fullpath, stat_mode); + /*** 20070410: changing mtime of a dir can cause NFS to confuse + See http://lists.samba.org/archive/rsync/2004-May/009439.html + So, I comment it out in the following ***/ + /* + if (utime(fullpath, ×) < 0) { + my_perror("utime"); + } + */ + return SUCCESS; + } +} + +int check_zero_page_entry() +{ + /* when total_pages = 0, the entry can be + an empty file + softlink (hardlink), + a directory + */ + if (had_done_zero_page) return SUCCESS; /* to avoid doing it again */ + + if (S_ISDIR(stat_mode)) { /* a directory */ + if (!update_directory()) { + had_done_zero_page = FAIL; + return FAIL; + } + } else if (S_ISLNK(stat_mode)) { /* Is it a softlink? */ + if (verbose>=2) + fprintf(stderr, "Making softlink: %s -> %s\n", fullpath, linktar); + delete_file(FALSE); /* remove the old one at fullpath */ + if (symlink(linktar, fullpath) < 0) { + my_perror("symlink"); + had_done_zero_page = FAIL; + return FAIL; + } + } else if (stat_nlink > 1) { /* hardlink */ + char fn[PATH_MAX]; + get_full_path(fn, linktar); /* linktar is a relative path from synclist */ + if (verbose>=2) + fprintf(stderr, "Making a hardlink: %s => %s\n", fullpath, fn); + my_unlink(fullpath); /* remove the old one */ + if (link(fn, fullpath)!=0) { + my_perror("link"); + had_done_zero_page = FAIL; + return FAIL; + } + } else { + /* it must be a regular file */ + + if (!touch_file()) { + had_done_zero_page = FAIL; + return FAIL; + } else { + set_owner_perm_times(); + } + } + had_done_zero_page = SUCCESS; + return SUCCESS; +} + diff --git a/global.c b/global.c new file mode 100644 index 0000000..64104eb --- /dev/null +++ b/global.c @@ -0,0 +1,35 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +char *cmd_name[] = { "TIME-OUT", + "TEST", + "SEND_DATA", + "RESEND_DATA", + "OPEN_FILE", + "EOF", + "CLOSE_FILE", + "CLOSE_ABORT", + "ALL_DONE", + "SEL_MONITOR", + "NULL"}; + +int verbose = 0; /* = 0 little detailed output, = 1,2 ... = n a lot more details */ +int machineID = -1; /* used for multicatcher */ + diff --git a/id_map.c b/id_map.c new file mode 100644 index 0000000..8b8dcaa --- /dev/null +++ b/id_map.c @@ -0,0 +1,74 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include <stdio.h> +#include <stdlib.h> +#include <limits.h> +#include <string.h> + +void strip(char * str); + +/* place to hold the array of string */ +char ** machine_names = NULL; /* array of (char*) */ +int nTargets=0; + +void get_machine_names(char * filename) +{ + FILE *fd; + char line[PATH_MAX]; + int count=0; + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + return; + } + while (fgets(line, PATH_MAX, fd) != NULL) { + strip(line); + if (strlen(line) != 0) ++count; + } + if (count == 0) { + fclose(fd); + fprintf(stderr, "No machine names in the file = %s\n", filename); + return; + } + + nTargets = count; + + rewind(fd); + machine_names = malloc(nTargets * sizeof(void*)); + + line[0] = '\0'; + count = 0; + while(fgets(line, PATH_MAX, fd) != NULL) { + strip(line); + if (strlen(line)==0) continue; + machine_names[count] = (char*)strdup(line); + line[0] = '\0'; + ++count; + } + fclose(fd); +} + +char * id2name(int id) +{ + return (machine_names) ? machine_names[id] : ""; +} @@ -0,0 +1,189 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Following the suggestion and the patch by Clint Byrum <clint@careercast.com>, + I added more control to selectively print out messages. + The control is done by the statement 'if (version >= n)' + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_h +#define __main_h + +#include <dirent.h> +#include <time.h> +#include <utime.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <stdio.h> +#include <stdlib.h> +#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */ +#include <arpa/inet.h> /* inet(3) functions */ +#include <errno.h> +#include <fcntl.h> /* for nonblocking */ +#include <sys/ioctl.h> +#include <netdb.h> +#include <signal.h> +#include <stdio.h> +#include <string.h> +#include <sys/stat.h> /* for S_xxx file mode constants */ +#include <sys/uio.h> /* for iovec{} and readv/writev */ +#include <unistd.h> +#include <sys/wait.h> +#include <sys/time.h> /* timeval{} for select() */ +#include <sys/times.h> +#include <limits.h> + +#define VERSION "4.0.1" + +/* logic values */ +#define FALSE 0 +#define TRUE 1 +#define FAIL (FALSE) +#define SUCCESS (TRUE) +#define GOOD_EXIT 0 +#define BAD_EXIT -1 + +/* Ports and addresses */ +#define PORT 8888 /* for multicast */ +#define FLOW_PORT (PORT-1) /* for flow-control */ +#define MCAST_ADDR "239.255.67.92" +#define MCAST_TTL 1 +#define MCAST_LOOP FALSE +#define MCAST_IF NULL + +/* + Handling socket's receive buffer on the target machine: + if the available data size in the receiveing buffer is larger + than TOO_MUCH then a TOO_FAST complaint is triggered. + The master will then sleep for USEC_TO_IDLE + Currently, this is not effective. +*/ +#define TOO_FAST_LIMIT (TOTAL_REC_PAGE / 2) /* if half is full, then too fast */ +#define TOO_MUCH (TOO_FAST_LIMIT * PAGE_BUFFSIZE) +#define USEC_TO_IDLE 1000000 + +/* TIMING stuff */ +#define FAST 5000 /* usec */ +#define DT_PERPAGE 6000 /* usec */ +#define FACTOR 90 /* interpage interval = FACTOR * DT_PERPAGE or DT_PERPAGE*/ +#define SECS_FOR_KILL 30 /* time(sec) allowed for 'kill -9 pid' to finish */ + +/* time for the master to wait for the acknowledgement */ +#define ACK_WAIT_PERIOD 1 /* secs (from time()); */ +#define ACK_WAIT_TIMES 60 /* wait for this many periods */ + +#define SICK_RATIO (0.9) +#define SICK_THRESHOLD (50) /* SICK FOR such many TIMES is really sick */ + +/* max wait time for write() a page of PAGE_SIZE -- 100 msec */ +#define WRITE_WAIT_SEC 0 +#define WRITE_WAIT_USEC 100000 + +#define SET_MON_WAIT_TIMES 6000 /* time = this number * FAST */ +#define NO_FEEDBACK_COUNT_MAX 10 +#define SWITCH_THRESHOLD 50 /* to avoid switching monitor too frequently + because of small diff in missing_pages */ + +/* complaints */ +#define TOO_FAST 100 +#define OPEN_OK 200 +#define CLOSE_OK 300 +#define MISSING_PAGE 400 +#define MISSING_TOTAL 500 +#define EOF_OK 600 +#define SIT_OUT 700 +#define PAGE_RECV 800 +#define MONITOR_OK 900 + +/* Sizes */ +/* 20060427: removed size_t which is arch-dependent */ +#define PAGE_SIZE 64512 +#define HEAD_SIZE (sizeof(int) + 2 * sizeof(int) + 2 * sizeof(int)) +#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE) +#define TOTAL_REC_PAGE 20 /* change to 4 in case hit the OS limit in buf size */ + +#define FLOW_HEAD_SIZE (sizeof(int)*4) +#define FLOW_BUFFSIZE (PAGE_SIZE+FLOW_HEAD_SIZE) +#define MAX_NPAGE (PAGE_SIZE / sizeof(int)) +/* + Modes and command codes: + The numerical codes are also the index to retrieve the command names + for printing in complaints.c +*/ +#define TIMED_OUT 0 +#define TEST 1 +#define SENDING_DATA 2 +#define RESENDING_DATA 3 +#define OPEN_FILE_CMD 4 +#define EOF_CMD 5 +#define CLOSE_FILE_CMD 6 +#define CLOSE_ABORT_CMD 7 +#define ALL_DONE_CMD 8 +#define SELECT_MONITOR_CMD 9 +#define NULL_CMD 10 + +/* machine status ----- for caster */ +#define MACHINE_OK_MISSING_PAGES '\2' +#define MACHINE_OK '\1' +#define NOT_READY '\0' + +#define BAD_MACHINE '\1' +#define GOOD_MACHINE '\0' + +#define FILE_RECV '\1' +#define NOT_RECV '\0' + +/* representation of all-targets for sends */ +#define ALL_MACHINES -1 + +/* PAGE STATUS */ +#define MISSING '\0' +#define RECEIVED '\1' + +/* MACHINE STATE ----- for catcher */ +#define IDLE_STATE 0 +#define GET_DATA_STATE 1 +#define DATA_READY_STATE 2 +#define SICK_STATE 3 + +/* + The following two are info to be packed into + meta data to represent either file or directory deletion. +*/ +/* SPECIAL # of PAGES to signal deleting action */ +#define TO_DELETE (-1) + +/* temporary file name prefix for transfering to */ +#define TMP_FILE "mrsync." + +#include "proto.h" + +#endif diff --git a/mrsync.py b/mrsync.py new file mode 100755 index 0000000..bd8c0c7 --- /dev/null +++ b/mrsync.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python + +import os,sys,string,time,getopt; + +my_module_path = os.path.dirname(sys.argv[0]); +sys.path.append(my_module_path); +from mrsync_config import * + +host = os.uname()[1]; + +def bin(path, bin_name): + "return the full-path of a binary code" + return '%s/%s' % (path, bin_name); + +def isPathThere(path): + "check the existence of path on this master machine" + if not os.path.exists(path): + print >>sys.stderr, 'Path (%s) could not be found on %s' % (path, host); + sys.exit(-1); + +map(isPathThere, [bin(binDir, x) for x in codes]); + +def printTimeMsg(msg): + print >>sys.stderr, 'Time = %s %s' % (time.ctime(time.time()), msg); + +def prMulticastLog(msg): + "append msg to the log file" + open(multicast_log, 'a').write('%s %s\n' % (time.ctime(time.time()), msg)); + +(catcher_err_log, goodTargetsFile, syncFileList) = \ + map(lambda x: + '%s.%s' % (x, ('%02d'*5) % time.localtime()[1:6]), + (catcher_err_log, goodTargetsFile, syncFileList)); + +def usage(): + print """mrsync.py (to sync files from one to many machines by multicast) + Option list: + [ -v <verbose_level 0-2 (2 is for debug)> ] + [ -w <ack_wait_times default= 60 (secs) ] + [ -r <remote shell to invoke multicatcher, default=rsh> ] + [ -b <remote_bin_dir_path for multicatcher, default=%s> ] + [ -o <more rsync options such as --include --exclude, + to be appended to default min_opts = %s>] + [ -x flag to turn off monitor mechanism (not fully tested and not recommended) ] + ----- Essential options -------------------------------------------------------- + -m <machine_list_file_path or csv_list (name1,name2...)> + -s <source_data_path> + [ -t <target_data_path; default = that in -s option> ] + [ -l <list of files(wildcards) to be synced, can be a filepath or csv_list> + mrsync by default uses rsync to find the list unless this option is given. ] + ----- mcast options ------------------------------------------------------------ + [ -A <my_MCAST_ADDR such as 239.255.67.92> ] + [ -P <my_PORT such as 9000> ] + [ -T <my_TTL default=1> ] + [ -L flag to turn on mcast_LOOP. default is off ] + [ -I <my_MCAST_IF default=NULL> ] + """ % (rBinDir, min_rsync_opt%reshell); + +# --- handle command line +opts, args = getopt.getopt(sys.argv[1:], 'hxv:w:r:b:o:m:s:t:l:A:P:T:LI:', []); + +if len(opts)==0 or len(args)>0: + usage(); + sys.exit(-1); + +verbose = 0; +ack_wait_times = -1; +without_monitor = False; +machineListFile = ''; +sourcePath = ''; +targetPath = ''; +synclist = ''; +rsync_opts = min_rsync_opt % reshell; + +mcast_addr = ''; +port = -1; +ttl = 1; +loop = False; +mcast_if = ''; + +if not len(opts) == 0: + for o,v in opts: + if o=='-v': + verbose = string.atoi(v); + elif o=='-w': + ack_wait_times = string.atoi(v); + elif o=='-h': + usage(); + sys.exit(0); + elif o=='-r': + reshell = v; + elif o=='-b': + rBinDir = v; + elif o=='-o': + rsync_opts += (' %s' % v) + elif o=='-m': + machineListFile = v; + elif o=='-s': + sourcePath = v; + elif o=='-t': + targetPath = v; + elif o=='-l': + synclist = v; + elif o=='-A': + mcast_addr = v; + elif o=='-P': + port = string.atoi(v); + elif o=='-T': + ttl = string.atoi(v); + elif o=='-L': + loop = True; + elif o=='-I': + mcast_if = v; + elif o=='-x': + without_monitor = True; + +if verbose>=1: print 'mrsync version 4.0.0'; + +if not machineListFile or not sourcePath: + print >>sys.stderr, 'Essential options (-m -s) should be specified.'; + usage(); + sys.exit(-1); + +isPathThere(sourcePath); +if not targetPath: targetPath = sourcePath; + +# ------------ get machine list +machines = (os.path.exists(machineListFile) and + [x[:-1] for x in open(machineListFile).readlines()] or + machineListFile.split(',')); + +# clean up the names +machines = filter(lambda x: x!='', machines); +machines = [x.strip() for x in machines]; + +if host in machines: + if verbose>=1: print 'remove myself (%s) from machine list...' % host; + machines.remove(host); + +# ------------ get the syncList from the first good machine +# the list is stored in syncFileList for multicaster. +import cmdToTarget; + +def cleanup(file): + "remove a file if it exists" + if os.path.exists(file): os.unlink(file); + +def get_synclist_from_cmdline(tmp): + "extracts synclist from cmdline option, outputs results into tmp_file" + cleanup(tmp); + if verbose>=1: print >>sys.stderr, 'extracting %s...' % synclist; + + list = (os.path.exists(synclist) and + [x[:-1] for x in open(synclist).readlines()] or + synclist.split(',')); + + if len(list)==0: + print >>sys.stderr, "Empty sync_list in cmd_line option %s" % synclist; + sys.exit(-1); + + def pr_relative_path(fullpath): + open(tmp, 'a').write('%s\n' % fullpath.replace(sourcePath+'/', '')); + + for item in list: + " each item can be a pattern for files" + import glob + map(pr_relative_path, glob.glob('%s/%s' % (sourcePath, item))); + + +def get_synclist_from_rsync(tmp): + "use rsync to get a list of to-be-synced files. results are put in tmp file" + cleanup(tmp); + for machine in machines: + # check if this machine is ok to rsh (ssh) + if (not cmdToTarget.isMachineOK(reshell, machine)): # if not go to next machine + continue; + + if verbose>=1: + print >>sys.stderr, 'Get to-be-synced files from %s...' % machine; + + cmd = ' '.join(filter(lambda x: x, + [rsyncPath, '--rsync-path='+remote_rsyncPath, \ + (reshell != 'rsh' and \ + rsync_opts.replace('--rsh=rsh', '--rsh=%s' % reshell) \ + or rsync_opts), + sourcePath+'/', + '%s:%s/' % (machine, targetPath), + '> %s 2>&1; ' % tmp])); + + if verbose>=1: print >>sys.stderr, cmd; + os.system(cmd); + break; + +def tmp_synclist(): + "intermediate synclist filename" + return '/tmp/%s' % os.path.basename(syncFileList); + +(synclist and get_synclist_from_cmdline or get_synclist_from_rsync)(tmp_synclist()); + +def ckFileSize(file): + if os.path.getsize(file)==0: + print >>sys.stderr, "Empty file = %s" % file; + sys.exit(-1); + +ckFileSize(tmp_synclist()); + +# translate the files generated by rsync or command-line option into +# a format which can be recognized by multicaster. +cmd = ' '.join([bin(binDir, translate), + tmp_synclist(), sourcePath, '>', syncFileList]); +if verbose>=1: print >>sys.stderr, cmd; +os.system(cmd); + +ckFileSize(syncFileList); + +# ------------ invoke multicatcher on all target machines +def gen_catcher_cmd(count): + "return mulitcatcher_command to be invoked on target machines" + return ' '.join(filter(lambda x: x, + [bin(rBinDir, catcher), + '-t', targetPath, + '-i', '%d'%count, # machine id + (mcast_addr and '-A '+mcast_addr or ''), + (port>0 and '-P %d'%port or ''), + (mcast_if and '-I %s'%mcast_if or ''), + '</dev/null 1>/dev/null 2>%s &' % catcher_err_log])); ### workaround ssh problem + +def invoke_catcher(ms, count, bads): + "invoke catcher for each machine in ms, return bad_machines in bads" + if not ms: return bads; + + machine = ms.pop(0); + + if (not cmdToTarget.isMachineOK(reshell, machine)): + if verbose>=1: print >>sys.stderr, "***%s is down" % machine; + bads.append(machine); + return invoke_catcher(ms, count, bads); + + cmd = gen_catcher_cmd(count); + if count==0 and verbose>=1: print >>sys.stderr,cmd; + status, output = cmdToTarget.docmd(reshell, machine, cmd); + + if (not status): + if verbose>=1: print >>sys.stderr, "***remote shell exec failed for %s" % machine; + bads.append(machine); + return invoke_catcher(ms, count, bads); + + if verbose>=1: print >>sys.stderr, 'id:%3d %s' % (count, machine); + open(goodTargetsFile, 'a').write('%s\n' % machine) + return invoke_catcher(ms, count+1, bads); + +printTimeMsg("Invoking multicatcher on all %d machines..." % len(machines)); +cleanup(goodTargetsFile); +badMachines = invoke_catcher(machines, 0, []); + +# -------------- invoke multicast on the master machine +printTimeMsg('Starting multicasting...'); +prMulticastLog('start multicast on %s' % host); + +def gen_caster_cmd(): + "return mulitcaster_command to be invoked on this host (master machine)" + return ' '.join(filter(lambda x: x, + [bin(binDir, caster), + '-v %d' % verbose, + '-m %s' % goodTargetsFile, + '-s %s' % sourcePath, + '-f %s' % syncFileList, + (ack_wait_times>0 and '-w %d'% ack_wait_times or ''), + (mcast_addr and '-A '+mcast_addr or ''), + (port>0 and '-P %d'%port or ''), + (ttl>1 and '-T %d'%ttl or ''), + (loop and '-L' or ''), + (mcast_if and '-I %s'%mcast_if or ''), + (without_monitor and '-x' or '')])); + +cmd = gen_caster_cmd(); +if verbose>=1: print cmd; +ex_code = os.system(cmd); +print >>sys.stderr, 'ex_code= ', ex_code; + +# -------------- to exit +printTimeMsg('ALL DONE.'); +prMulticastLog('multicast ends on %s' % host); +sys.exit(ex_code); diff --git a/mrsync_config.py b/mrsync_config.py new file mode 100644 index 0000000..5953e24 --- /dev/null +++ b/mrsync_config.py @@ -0,0 +1,67 @@ +# configuration file + +# used by mrsync.py to find the executables multicaster, multicatcher, trFilelist +# hopefully, all machines put them in the same place as on the master machine. +binDir = '/a/path/name'; # bin +rBinDir = binDir; # for remote target machines. + +# This is a general bookkeeping file for mrsync.py. +# It records start_time and end_time of a mrsync session. +# Each mrsync.py appends time info into this file. +# So it should be a path that can be accessed by any machine invoking mrsync.py + +multicast_log = '/path/multicast.log'; + +#--------------------------------------------------------------------------- +# The following three file-names are for tmp storage +# +# To allow for multiple multicast sessions at the same time, the actual +# name of the file will be that specified here plus a suffix = .mmddhhmmss +# The latter (mmddhhmm) is generated by mrsync.py +# --> It is necessary to clean up old files :)) +# + +# This is a path on all target machines for a running multicatcher +# to catch its stderr output if any. Usually we don't expect any output in this file. +# It's there for debugging purpose in case any error event occurs. +# +# Set it to /dev/null, if we don't want any output. +# +catcher_err_log = '/a_path/catcher.err'; + +# The goodTargetsFile is for mrsync.py to output names of the machines that are accessible. +# It is read by multicaster. +caster_log_dir = '/A_PATH/sync/'; +goodTargetsFile = caster_log_dir + 'syncing'; + +# The file path for storing list of to-be-transfered files +# It is used by multicaster. +syncFileList = caster_log_dir + 'syncfile.lst'; +# +# --------------------------------------------------------------------------- +# + +# Path to find the rsync binary +rsyncPath = '/usr/local/bin/rsync'; +remote_rsyncPath = rsyncPath; + +# +reshell = "rsh"; # can be changed thru command line option + +# +# Do NOT change the following unless you know what you are doing. +# +# This is the absolute minimum of rsync options for multicaster +# rsync is used only to find files that need to be synced. +# -- to add other options such as --exclude --include, use command line option +# those extra options will be appended to this min_rsync_opt +min_rsync_opt = '--rsh=%s -avW --dry-run --delete'; + +# +# varioius binary codes used +# +# the binary to translate file_list generated by rsync or manually for multicaster +translate = 'trFilelist'; +catcher = 'multicatcher'; +caster = 'multicaster'; +codes = [ translate, catcher, caster]; diff --git a/multicaster.c b/multicaster.c new file mode 100644 index 0000000..3b26cae --- /dev/null +++ b/multicaster.c @@ -0,0 +1,582 @@ +/* + Copyright (C) 2006-2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + version 3.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + -- catching slow machine as the feedback monitor + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64 bit arch by Nicolas Marot in France + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() + version 4.0 major update + -- consolidate sending missing pages in complaint flow + cutting the messages by one order magnitude + -- exit code adjustment + + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include <stdlib.h> +#include <limits.h> /* to define PATH_MAX */ +#include <sys/times.h> +#include <setjmp.h> +#include <signal.h> + +extern int verbose; +extern char * machine_list_file; /* defined in complaints.c */ +extern char * bad_machines; /* array of size nMachines, defined in complaints.c */ +extern char * file_received; +extern int * missing_pages; +extern int backup; +extern int nPattern; +extern char * pattern_baseDir; +extern int nTargets; +extern int my_ACK_WAIT_TIMES; +extern int toRmFirst; +extern int quitWithOneBad; +extern int skip_count; +extern int file_changed; + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +int my_TTL = MCAST_TTL; +int my_LOOP = MCAST_LOOP; +char * my_IFname = MCAST_IF; + +int monitorID = -1; + +int no_feedback_count; + +void usage() +{ + fprintf(stderr, + "multicaster (to copy files to many multicatchers) version %s)\n" + " Option list:\n" + " [ -v <verbose_level 0-2> ]\n" + " [ -w <ack_wait_times default= %d (secs) ]\n" + " [ -X toRmFirst_flag default= cp2tmp_and_mv ]\n" + " [ -q quit_with_1_bad defalut= quit_with_all_bad ]\n" + " -------- essential options ----------------------------------\n" + " -m <machine_list_filePath>\n" + " -s <data_source_path>\n" + " -f <synclist_filePath>\n" + " -------- options for backup ---------------------------------\n" + " [ -b flag to turn on backup ]\n" + " [ -r <filepath> for regex patterns for files needing backup ]\n" + " [ -d <basedir> for regex patterns ]\n" + " -------- mcast options --------------------------------------\n" + " [ -A <my_mcast_address default=%s> **same as for multicatcher ]\n" + " [ -P <my_PORT default=%d> **same as for multicatcher ]\n" + " [ -T <my_TTL default=%d> ]\n" + " [ -L flag turn on mcast_LOOP. default is off ]\n" + " [ -I <my_MCAST_IF default=NULL> ]\n", + VERSION, my_ACK_WAIT_TIMES, MCAST_ADDR, PORT, my_TTL); +} + + +int monitor_cmd(int cmd, int machineID) +{ + /* + Once this fx is called the old monitor will be + turned off. So, we need to make sure this fx + returns TRUE for a machine. Or the monitor_set_up + should fail. + */ + int count =0, resp; + set_delay(0, FAST); + send_cmd(cmd, machineID); + while (1) { /* wait for ack */ + resp = read_handle_complaint(cmd); + if (resp<0) { /* time-out */ + ++count; + send_cmd(cmd, machineID); + if (count > SET_MON_WAIT_TIMES) { + return FALSE; + } + } else if (resp==1) { /* got ack */ + return TRUE; + } + /* irrevelant resp -- continue */ + } +} + +int find_max_missing_machine(char *flags) +{ + /* flags[] -> which machines are not considered */ + int i, index = -1; + int max = -1; + int threshold; + + for(i=0; i<nTargets; ++i) { + int missing; + /** missing = total_missing_page[i]; **/ + missing = missing_pages[i]; /* for last file (OPEN) or this file (other cmds)*/ + if (missing > max && flags[i] == GOOD_MACHINE) { + max = missing; + index = i; + } + } + + threshold = (get_nPages() > SWITCH_THRESHOLD*100) ? + get_nPages() / 100 : SWITCH_THRESHOLD; + if (index>=0) { + if (monitorID>=0) { + if (flags[monitorID]==BAD_MACHINE) return index; + /** if (max <= (total_missing_page[monitorID] + threshold)) **/ + if (max <= (missing_pages[monitorID] + threshold)) + return monitorID; + } + return index; + } else { /* could come here if all are busy*/ + return -1; + } + + /** + return ((index >= 0) && + (monitorID >= 0) && + (bad_machines[monitorID] == GOOD_MACHINE) && + (max <= (total_missing_page[monitorID] + threshold))) ? + monitorID : index; + **/ +} + +void set_monitor(int mid) +{ + /* one machine is to be set. So need to succeed. */ + if (monitor_cmd(SELECT_MONITOR_CMD, mid)) { + if (verbose >=1) fprintf(stderr, "Monitor - %s\n", id2name(mid)); + return; + } else { + fprintf(stderr, "Fatal: monitor %s cannot be set up!\n", id2name(mid)); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void check_change_monitor(int undesired_index) +{ + /* this function changes the value of the global var: monitorID */ + int i, count; + char * flags; + + /* if all targets received the file, no need to go on */ + if ((count=nNotRecv())==0) return; + + if (count==1) { + i = iNotRecv(); + if (bad_machines[i] == GOOD_MACHINE) { + monitorID = i; + /* + 'i' could be the current monitor. + We'd like to set it because there might + be something wrong with it if we come to + to this point. + */ + set_monitor(monitorID); + } + return; + } + + /* more than two machines do not receive the file yet */ + /* flags mark those machines as BAD which we don't want to consider */ + flags = malloc(nTargets * sizeof(char)); + for(i=0; i<nTargets; ++i) { + flags[i] = (i == undesired_index || file_received[i] == FILE_RECV) ? + BAD_MACHINE : bad_machines[i]; + } + + /* check if all machines are not to be considered */ + count = 0; + for(i=0; i<nTargets; ++i) { + if (flags[i]==BAD_MACHINE) ++count; + } + if (count==nTargets) { + /* Two ways to get here are + (1) during do_one_page: + prev_monitor = (not_recv) and other not_recv's are bad_machines. + (2) during after-ack: + prev_monitor = (recv) and other not_recv's are bad_machines. + */ + free(flags); + set_monitor(monitorID); + return; + } + + /* at least one not_recv (and good) machine is to be considered */ + count = 0; + while (count < nTargets) { + i = find_max_missing_machine(flags); + /* if (i==monitorID) break;*/ + monitorID = i; + + if (monitorID < 0) break; + + if (monitor_cmd(SELECT_MONITOR_CMD, monitorID)) { + if (verbose >=1) fprintf(stderr, "Monitor = %s\n", id2name(monitorID)); + break; + } else { + flags[monitorID] = BAD_MACHINE; + /* Then, we attemp to set up other machine */ + } + ++count; + } + + free(flags); + if (monitorID < 0) { + fprintf(stderr, "Fatal: monitor machine cannot be set up!\n"); + send_done_and_pr_msgs(-1.0, -1.0); + exit(BAD_EXIT); + } +} + +void do_one_page(int page) +{ + int resp; + unsigned long rtt; + refresh_timer(); + start_timer(); + if (!send_page(page)) return; + + /* first ignore all irrelevant resp */ + resp=read_handle_complaint(SENDING_DATA); + while (resp==0) { + resp=read_handle_complaint(SENDING_DATA); + } + + /* read_handle_complaint() waits n*interpage_interval at most */ + if (resp==-1) { + /* delay_sec for readable() is set by set_delay() */ + /****** + at this point, the readable() returns without getting a reply + from monitorID after FACTOR*DT_PERPAGE (or DT_PERPAGE if without_monitor) + ****/ + ++no_feedback_count; + if (verbose>=2) printf("no reply, count = %d\n", no_feedback_count); + update_rtt_hist(999999); + /* register this page as rtt = infinite --- the last element in rtt_hist */ + + if (no_feedback_count > NO_FEEDBACK_COUNT_MAX) { + /* switch to another client */ + if (verbose >=2) + fprintf(stderr, + "Consecutive non_feedback exceeds limit, Changing monitor machine.\n"); + /* if (nTargets>1 && (nTargets - nBadMachines()) > 1 && nNotRecv() > 1) */ + check_change_monitor(monitorID); /* replace the current monitor */ + no_feedback_count = 0; + } + return; + } else { /* resp == 1 */ + end_timer(); + update_time_accumulator(); + rtt = get_accumulated_usec(); + /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ + /* to do: update histogram */ + if (verbose >=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); + update_rtt_hist(rtt); + + no_feedback_count = 0; + } +} + +void send_cmd_and_wait_ack(int cmd_code) +{ + send_cmd(cmd_code, (int) ALL_MACHINES); + refresh_machine_status(); + /*set_delay(0, FAST);*/ + set_delay(0, DT_PERPAGE*FACTOR); + if (cmd_code==EOF_CMD) mod_machine_status(); + wait_for_ok(cmd_code); + do_badMachines_exit(); + /* check_change_monitor(-1); */ +} + +int do_file_changed_skip() +{ + /* if file is changed during syncing, then we should skip this file */ + if (file_changed || !same_stat_for_file()) { + fprintf(stderr, "WARNING: file is changed during sycing -- skipping\n"); + send_cmd_and_wait_ack(CLOSE_ABORT_CMD); + free_missing_page_flag(); + ++skip_count; + return TRUE; + } + return FALSE; +} + +int main(int argc, char *argv[]) +{ + int c; + int cfile, ctotal_pages, cpage; + char * source_path = NULL; + char * synclist_path = NULL; + char * machine_list_file = NULL; + time_t tloc; + time_t time0, time1, t_page0, t_page; + + while ((c = getopt(argc, argv, "v:w:A:P:T:LI:m:s:f:br:d:Xq")) != EOF) { + switch (c) { + case 'v': + verbose = atoi(optarg); + break; + case 'w': + my_ACK_WAIT_TIMES = atoi(optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'T': + my_TTL = atoi(optarg); + break; + case 'L': + my_LOOP = TRUE; + break; + case 'I': + my_IFname = optarg; + break; + case 'm': + machine_list_file = optarg; + break; + case 's': + source_path = optarg; + break; + case 'f': + synclist_path = optarg; + break; + case 'b': + backup = TRUE; /* if nPattern==0, backup means back up ALL files */ + break; + case 'r': /* to selectively back up certain files as defined in the pattern */ + if (!read_backup_pattern(optarg)) { + fprintf(stderr, "Failed in loading regex patterns in file = %s\n", optarg); + exit(BAD_EXIT); + } + break; + case 'd': + pattern_baseDir = strdup(optarg); + if (pattern_baseDir[strlen(pattern_baseDir)-1]=='/') + pattern_baseDir[strlen(pattern_baseDir)-1] = '\0' ; /* remove last / */ + break; + case 'X': + toRmFirst = TRUE; + break; + case 'q': + quitWithOneBad = TRUE; + break; + case '?': + usage(); + exit(BAD_EXIT); + } + } + + if (!machine_list_file || !source_path || !synclist_path ) { + fprintf(stderr, "Essential options (-m -s -f) should be specified. \n"); + usage(); + exit(BAD_EXIT); + } + + if (nPattern>0) backup = TRUE; + if (backup && nPattern>0) { + if (!pattern_baseDir) pattern_baseDir = strdup(source_path); + if (strlen(source_path) < strlen(pattern_baseDir) || + strncmp(source_path, pattern_baseDir, strlen(pattern_baseDir))!=0) { + fprintf(stderr, + "src_path (%s) should include (and be longer than) pattern_baseDir (%s)", + source_path, pattern_baseDir); + exit(BAD_EXIT); + } + } + + if (backup && toRmFirst) { + fprintf(stderr, "-B and -X cannot co-exist\n"); + exit(BAD_EXIT); + } + + get_machine_names(machine_list_file); + if (nTargets==0) { + fprintf(stderr, "No target to sync to\n"); + exit(GOOD_EXIT); + } + + if (!init_synclist(synclist_path, source_path)) exit(BAD_EXIT); + + if (total_entries()==0) { + fprintf(stderr, "Nothing to sync in %s\n", synclist_path); + exit(GOOD_EXIT); + } + + if (verbose >= 2) + fprintf(stderr, "Total number of files: %d\n", total_entries()); + + /* init the network stuff and some flags */ + init_sends(); + init_complaints(); + init_machine_status(nTargets); + + /* set up Cntl_C catcher */ + Signal(SIGINT, do_cntl_c); + + /* ------------------- set up monitor machine for doing feedback for each page sent */ + check_change_monitor(-1); + + /*-------------------------------------------------------------------------------*/ + + init_rtt_hist(); + time0 = time(&tloc); /* start time */ + t_page = 0; /* total time for sending pages */ + + /* -----------------------------Send the file one by one -----------------------------------*/ + for (cfile = 1; cfile <= total_entries(); cfile++) { /* for each file to be synced */ + if (!get_next_entry(cfile)) continue; + + ctotal_pages = pages_for_file(); + + /* + By the time this file, which was obtained when synclist was + established some time ago, may no longer exist on the master. + So, we need to check the existence of this file. + fexist() also opens the file so that it won't be deleted + between here and the send-page-loop. + */ + if (ctotal_pages > 0 && (!same_stat_for_file() || + !fexist(current_entry()))) { + /* go to next file if this file has changed or does not exist */ + fprintf(stderr, "%s (%d out of %d; Extinct file)\n", + getFilename(), current_entry(), total_entries()); + adjust_totals(); + continue; + } + + if (ctotal_pages < 0) { + fprintf(stderr, "%s (%d out of %d; to delete)\n", + getFilename(), current_entry(), total_entries()); + } else { + fprintf(stderr, "%s (%d out of %d; %d pages)\n", + getFilename(), current_entry(), total_entries(), ctotal_pages); + } + + /* send_open_cmd */ + pack_open_file_info(); + send_cmd_and_wait_ack(OPEN_FILE_CMD); + + /* + ctotal_pages < 0, for deletion + ctotal_pages = 0, regular file with no content. + or directory, softlink, hardlink + both should have been finished with OPEN_FILE_CMD + */ + if (ctotal_pages <= 0) continue; + + /* for other regular files */ + init_missing_page_flag(ctotal_pages); + refresh_missing_pages(); /* total missing pages for this file for each tar */ + + /* ----- sending file data ----- first round */ + t_page0 = time(&tloc); + no_feedback_count = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++) { + /* + the mode field and delay may be changed by change_monitor + */ + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(SENDING_DATA); + do_one_page(cpage); + } + + if (do_file_changed_skip()) continue; + + /* send "I am done with the first round" */ + reset_has_missing(); + refresh_file_received(); /* to record machines that have received this file */ + send_cmd_and_wait_ack(EOF_CMD); + + /* after the first run, before we go to 2nd and 3rd run, */ + if (has_missing_pages()) check_change_monitor(-1); + + /* ----- sending file data again, 2nd and 3rd and ...n-th round */ + reset_has_sick(); + while (has_missing_pages()) { + int c; /****************/ + no_feedback_count = 0; + + c = 0; + for (cpage = 1; cpage <= ctotal_pages; cpage++){ + if (is_it_missing(cpage-1)) { + set_delay(0, DT_PERPAGE*FACTOR); + set_mode(RESENDING_DATA); + do_one_page(cpage); + page_sent(cpage-1); + ++c; /*************/ + } + } + if (verbose>=1) + fprintf(stderr, "re-sent N_pages = %d\n", c); /*************/ + + /* eof */ + reset_has_missing(); + send_cmd_and_wait_ack(EOF_CMD); + if (has_sick_machines()) { + break; + /* one machine can reach sick_state while some others are still + in missing_page state. + This break here is ok in terms of skipping this file.*/ + } else { + check_change_monitor(-1); + } + }; + + t_page += (time(&tloc) - t_page0);; + if (do_file_changed_skip()) continue; + + /* close file */ + send_cmd_and_wait_ack((has_sick_machines()) ? CLOSE_ABORT_CMD : CLOSE_FILE_CMD); + if (has_sick_machines()) { + fprintf(stderr, "Skip_syncing %s\n", getFilename()); + } + free_missing_page_flag(); + } /* end of the for each_file loop */ + + time1= time(&tloc); + return send_done_and_pr_msgs( ((double)(time1 - time0))/ 60.0, ((double)t_page)/60.0); +} + diff --git a/multicatcher.c b/multicatcher.c new file mode 100644 index 0000000..76fbc5e --- /dev/null +++ b/multicatcher.c @@ -0,0 +1,181 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + verision 3.0 major update + -- large file support + -- platform independence (between linux, unix) + -- backup feature (as in rsync) + -- removing meta-file-info + -- catching slow machine as the feedback monitor + -- mcast options + version 3.0.[1-9] bug fixes + -- logic flaw which under certain condition + caused premature dropout due to + unsuccessful EOF, CLOSE_FILE + and caused unwanranted SIT-OUT cases. + -- tested on Debian 64 bit arch by Nicolas Marot in France + version 3.1.0 + -- codes for IPv6 are ready (but not tested) + IPv4 is tested ok. + version 3.2.0 + -- monitor change improvement + -- handshake improvement (e.g. seq #) + -- if one machine skips a file, all will NOT close() + + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +extern int machineID; +extern int verbose; +extern int isMonitor; +extern char * baseDir; +extern char * cmd_name[]; +extern char * backup_suffix; + +char * my_MCAST_ADDR = MCAST_ADDR; +char * my_IFname = MCAST_IF; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; + +void usage() +{ + fprintf(stderr, + "multicatcher (to receive files from multicaster) version %s\n" + " Option list:\n" + " [ -v <verbose_level 0-2> ]\n" + " -------- essential options ----------------------------------\n" + " -t <destination Dirpath>\n" + " -i <machineID 0 originated id numbers>\n" + " -------- options for backup ---------------------------------\n" + " [ -u <suffix> for backup files if -b is on in multicaster ]\n" + " -------- mcast options --------------------------------------\n" + " [ -A <my_mcast_address default=%s)> **same as for multicaster ]\n" + " [ -P <my_PORT default=%d> **same as for multicaster ]\n" + " [ -I <my_MCAST_IF default=NULL> ]\n", + VERSION, MCAST_ADDR, PORT); +} + +int main(int argc, char *argv[]) +{ + int old_mode; /* hp: from char to int for mode */ + int mode; + int c; + + while ((c = getopt(argc, argv, "v:A:P:t:i:u:I:")) != EOF) { + switch (c) { + case 'v': + verbose = atoi(optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'I': + my_IFname = optarg; + break; + case 't': + baseDir = optarg; + break; + case 'i': + machineID = atoi(optarg); + break; + case 'u': + backup_suffix = strdup(optarg); + break; + case '?': + usage(); + exit(BAD_EXIT); + } + } + + if (machineID < 0 || !baseDir) { + fprintf(stderr, "Essential options (-t -i) should be specified. \n"); + usage(); + exit(BAD_EXIT); + } + + fprintf(stderr, "my_pid= %lu\n", getpid()); + + if (!backup_suffix) default_suffix(); + get_tmp_suffix(); /* get a unique tmp_name for the tmp file */ + + init_page_reader(); + init_complaint_sender(); + + /* initialize random numbers */ + srand(time(NULL) + getpid()); + + /* set the timeout for readable() to be about 3 to 6 seconds + Actually, this setting is arbitrary. + The timeout of readable() does not play a role in + the logic flow. + */ + set_delay( 3 + rand() % 6, 0); + mode = old_mode = TEST; + + /* -----------------------The main loop--------------------------- + Multicatcher simply waits for any incoming UDP, + reads and handles it. + If the UDP contains file content, it is placed in the right place. + If the UDP contains an instruction, it is carried out. + + Multicatcher never complains unless being told so. + For example, as it is now, multicatcher does not complain + about the rate of incoming UDP being too fast to handle. + If multicatcher cannot keep up with the speed, it just + loses certain pages in a file which will be reported + later when multicaster requests acknowledgement. + ---------------------------------------------------------------- */ + while(1) { /* loop for all incoming pages */ + if (verbose>=2) + fprintf(stderr, "Starting listen loop with mode %d, old_mode = %d\n", mode, old_mode); + + /* the major task here */ + mode = read_handle_page(); + if (verbose>=2) fprintf(stderr, "new page in mode %d\n", mode); + + if (mode == ALL_DONE_CMD) break; + + /* for debugging purpose */ + if ((old_mode != mode)) { + if (verbose>=2 && mode <= 5) fprintf(stderr, "%s\n", cmd_name[mode]); + } + + /* got no data? */ + if (mode == TIMED_OUT) { + if (verbose>=2) fprintf(stderr, "*"); + } + + old_mode = mode; + } /* end of incoming page loop */ + + if (verbose>=1) fprintf(stderr, "Done!\n"); + return 0; +} diff --git a/page_reader.c b/page_reader.c new file mode 100644 index 0000000..8f349a0 --- /dev/null +++ b/page_reader.c @@ -0,0 +1,426 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" + +/* the following is needed on Sun but not on linux */ +#ifdef _SUN +#include <sys/filio.h> +#endif + +extern int machineID; +extern int verbose; +extern char * my_MCAST_ADDR; /* defined in multicatcher.c */ +extern char * my_IFname; +extern int my_PORT; +extern unsigned int current_file_id; + +int isMonitor; /* flag = if this target machine is a designated monitor */ +int nPage_recv; /* counter for the number of pages received for a file */ +int machineState; /* there are four states during one file transmission */ +int isFirstPage=TRUE; /* flag */ + +/* The followings are used to determine sick condition */ +int current_missing_pages; +int last_missing_pages; +int sick_count; + +/* receive socket */ +int recfd; +#ifndef IPV6 +struct sockaddr_in rec_addr; +#else +struct sockaddr_in6 rec_addr; +#endif + +/* + Receive buffer for storing the data obtained from UDP + The format: + (5*sizeof(int) bytes header) + (PAGE_SIZE data area) + + The header has five int_type (4 bytes) int's. + (1) mode -- for master to give instructions to the target machines. + (2) current file index (starting with 1) + (3) current page index (starting with 1) + (4) bytes that has been sent in this UDP page + (5) total number of pages. + + data_ptr points to the data area. +*/ +int *mode_ptr; /* hp: change from char to int */ +int *total_pages_ptr; +int *current_page_ptr; +int *bytes_sent_ptr, *current_file_ptr; +char *data_ptr; +char rec_buf[PAGE_BUFFSIZE]; + +void init_page_reader() +{ + /*struct ip_mreq mreq;*/ + int rcv_size; + + machineState = IDLE_STATE; + isMonitor = FALSE; + + /* Prepare buffer pointers */ + mode_ptr = (int*)rec_buf; + current_file_ptr = (int *)(mode_ptr + 1); + current_page_ptr = (int *)(current_file_ptr + 1); + bytes_sent_ptr = (int *)(current_page_ptr + 1); + total_pages_ptr = (int *)(bytes_sent_ptr + 1); + data_ptr = (char *)(total_pages_ptr + 1); + + /* Set up receive socket */ + if (verbose>=2) fprintf(stderr, "setting up receive socket\n"); + recfd = rec_socket(&rec_addr, my_PORT); + + /* Join the multicast group */ + /*inet_pton(AF_INET, MCAST_ADDR, &(mreq.imr_multiaddr.s_addr)); + mreq.imr_multiaddr.s_addr = inet_addr(my_MCAST_ADDR); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + + if (setsockopt(recfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void*)&mreq, sizeof(mreq)) < 0){ + perror("Joining Multicast Group"); + } + */ + + if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) { + perror("Joining Multicast Group"); + } + + /* Increase socket receive buffer */ + rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE; + if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer for page_reader"); + } + +} + +/* + This is the heart of multicatcher. + It parses the incoming pages and do proper reactions according + to the mode (command code) encoded in the first 4 bytes in an UDP page. + It returns the mode. +*/ +int read_handle_page() +{ + #ifndef IPV6 + struct sockaddr_in return_addr; + #else + struct sockaddr_in6 return_addr; + #endif + int bytes_read; + socklen_t return_len = (socklen_t)sizeof(return_addr); + int mode_v, bytes_sent_v, total_pages_v, current_file_v, current_page_v; + + /* -----------receiving data -----------------*/ + if (readable(recfd) == 1) { /* there is data coming in */ + /* check_queue(); This might be useful. But need more study. */ + /* get data */ + bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0, + (struct sockaddr *)&return_addr, + (socklen_t*) &return_len); + + bytes_sent_v = ntohl(*bytes_sent_ptr); + if (bytes_read != (bytes_sent_v + HEAD_SIZE)) + return NULL_CMD; + + /* convert from network byte order to host byte order */ + mode_v = ntohl(*mode_ptr); + total_pages_v = ntohl(*total_pages_ptr); + current_file_v = ntohl(*current_file_ptr); + current_page_v = ntohl(*current_page_ptr); + + if (isFirstPage) { + update_complaint_address(&return_addr); + isFirstPage = FALSE; + } + + /* --- process various commands (modes) */ + switch (mode_v) { + case TEST: + /* It is just a test packet? */ + fprintf(stderr, "********** Received test packet **********\n"); + return mode_v; + + case SELECT_MONITOR_CMD: + if (current_page_v == machineID) { + isMonitor = TRUE; + send_complaint(MONITOR_OK, machineID, 0, 0); + } else { + isMonitor = FALSE; + } + return mode_v; + + case OPEN_FILE_CMD: + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) + && machineState == IDLE_STATE) { + /* get info about this file */ + if (verbose>=1) + fprintf(stderr, "open file id= %d\n", current_file_v); + if (!extract_file_info(data_ptr, current_file_v, total_pages_v)) + return mode_v; + + /* different tasks here */ + /* open file, rmdir, unlink */ + if (total_pages_v < 0) { /* delete a file or a directory */ + if (!delete_file(TRUE)) return mode_v; /* this fx can be re-entered many times */ + /* machineState remains to be IDLE_STATE */ + } else if (total_pages_v == 0) { + /* handle an empty file, or (soft)link or directory */ + if (!check_zero_page_entry()) return mode_v; /* can re-enter many times */ + /* machineState remains to be IDLE_STATE */ + } else { /* a regular file */ + if (!open_file()) return mode_v; + /* the file has been opened. */ + sick_count = 0; + current_missing_pages =0; + last_missing_pages = nPages_for_file(current_file_v); + machineState = GET_DATA_STATE; + } + send_complaint(OPEN_OK, machineID, 0, current_file_id); /* ack */ + nPage_recv = 0; + return mode_v; + } + /* + We must be in GET_DATA_STATE. + OPEN_OK ack has been sent back in the previous block. + However, + the master may not have received the ack. + In that case the master will send back the open_file_cmd again. + */ + if ((current_page_v == (int) ALL_MACHINES ||current_page_v == (int) machineID) + && (machineState == GET_DATA_STATE)) { /****/ + send_complaint(OPEN_OK, machineID, 0, current_file_id); + } + return mode_v; + + case EOF_CMD: + /**********/ + if (verbose>=1) + fprintf(stderr, "***** EOF received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + /* the following happnens when this machine was previously out-of-pace + and was labeled as 'BAD MACHINE' by the master. + The master has proceeded with the syncing process without + waiting for this machine to finish the process in one of the previous files. + Since under normal condition, this machine should not expect to see + current_file changes except when OPEN_FILE_CMD is received. + */ + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + /* normal condition */ + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) + && machineState == GET_DATA_STATE) { /* GET_DATA_STATE */ + /* check missing pages and send back missing-page-request */ + current_missing_pages = ask_for_missing_page(); /* = total # of missing_pages */ + + if (current_page_v == (int) machineID) { + /* master is asking for my EOF_ack */ + if (current_missing_pages == 0) { + /* w/o assuming how we get to this point ... */ + machineState = DATA_READY_STATE; + send_complaint(EOF_OK, machineID, 0, current_file_id); + } else { + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages , current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + } + return mode_v; + } + + /*** + master is asking everyone, (after master sent or re-sent pages) + so, we do some book-keeping procedures (incl state change) + ***/ + if (verbose >=1) + fprintf(stderr, "missing_pages = %d, nPages_received = %d file = %d\n", + current_missing_pages, nPage_recv, current_file_v); /************/ + + nPage_recv = 0; + + if (current_missing_pages == 0) { + /* + There is no missing page. + Change the state. + */ + machineState = DATA_READY_STATE; + send_complaint(EOF_OK, machineID, 0, current_file_id); + return mode_v; + } else { + /* + There are missing pages. + If we still miss many pages for SICK_THRESHOLD consecutive times, + then we are sick. e.g. machine CPU does not give multicatcher + enough time to process incoming UDP's OR the disk I/O is too slow. + */ + + if ((SICK_RATIO)*(double)last_missing_pages < (double)current_missing_pages) { + ++sick_count; + if (sick_count > SICK_THRESHOLD) { + machineState = SICK_STATE; + send_complaint(SIT_OUT, machineID, + 0, current_file_id); /* no more attempt to receive */ + /* master may send more pages from requests from other machines + but this machine will mark this file as 'sits out receiving' */ + } else { + /* not sick enough yet */ + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages, current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + /* master will send more pages */ + } + } else { /* we are getting enough missing pages this time to keep up */ + sick_count = 0; /* break the consecutiveness */ + send_complaint(MISSING_TOTAL, machineID, + current_missing_pages, current_file_id); + missing_page_stat(); /* this has to be done before close_file() ******************/ + /* master will send more pages */ + } + last_missing_pages = current_missing_pages; + return mode_v; + } + } /* end GET_DATA_STATE */ + + /* After state change, we still get request for ack. + send back ack again */ + if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)) { + switch (machineState) { + case DATA_READY_STATE: + send_complaint(EOF_OK, machineID, + 0, current_file_id); + return mode_v; + case SICK_STATE: + send_complaint(SIT_OUT, machineID, + 0, current_file_id); /* just an ack, even for sick state*/ + return mode_v; + } + } + return mode_v; + + case CLOSE_FILE_CMD: + if (verbose>=1) + fprintf(stderr, "***** CLOSE received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) { + if (machineState == DATA_READY_STATE) { + if (!close_file()) { return mode_v; }; + set_owner_perm_times(); + machineState = IDLE_STATE; + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + return mode_v; + } else if (machineState == IDLE_STATE) { + /* send ack back again because we are asked */ + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + return mode_v; + } else { /* other states -- we should not be here*/ + /* if (machineState == SICK_STATE || machineState == GET_DATA_STATE) */ + /* SICK_STATE --> we are too slow in getting missing pages + if one of the machines is sick, master will send out CLOSE_ABORT + GET_DATA_STATE --> + We are not supposed to be in GET_DATA_STATE, + so consider it a sick_state */ + fprintf(stderr, "*** should not be here -- state=%d\n", machineState); + if (!rm_tmp_file()) { return mode_v; }; + machineState = IDLE_STATE; + send_complaint(SIT_OUT, machineID, 0, current_file_id); + /* make sick_count larger than threshold for GET_DATA_STATE */ + sick_count = SICK_THRESHOLD + 10000; + return mode_v; + } + } + return mode_v; + + case CLOSE_ABORT_CMD: + if (verbose>=1) + fprintf(stderr, "***** CLOSE_ABORT received for id=%d state=%d id=%d, file=%d\n", + current_page_v, machineState, machineID, current_file_v); + + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + + if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) { + if (!rm_tmp_file()) { return mode_v; }; + machineState = IDLE_STATE; + send_complaint(CLOSE_OK, machineID, 0, current_file_id); + } + return mode_v; + + case SENDING_DATA: + case RESENDING_DATA: + if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */ + /* + otherwise, go ahead... + */ + if (verbose>=2) { + fprintf(stderr, "Got %d bytes from page %d of %d for file %d mode=%d\n", + bytes_read - HEAD_SIZE, + current_page_v, total_pages_v, + current_file_v + 1, mode_v); + } + + /* timing the disk IO */ + /* start_timer(); */ + + write_page(current_page_v, data_ptr, bytes_read - HEAD_SIZE); + if (isMonitor) send_complaint(PAGE_RECV, machineID, 0, current_file_id); + + /* + end_timer(); + update_time_accumulator(); + */ + + /* Yes, we have just read a page */ + ++nPage_recv; + return mode_v; + + case ALL_DONE_CMD: + /* + clear up the files. + */ + /*** since we do not know if there are other machines + that are NOT in data_ready_state, + to maintain equality, it is best to just + remove tmp_file without close_file() ***/ + rm_tmp_file(); + return mode_v; + + default: + return mode_v; + } /* end of switch */ + } else { + /* No, the read is timed out */ + return TIMED_OUT; + } +} + diff --git a/parse_synclist.c b/parse_synclist.c new file mode 100644 index 0000000..0c2a7aa --- /dev/null +++ b/parse_synclist.c @@ -0,0 +1,320 @@ +/* + Copyright (C) 2006-2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include <stdlib.h> +#include <limits.h> /* to define PATH_MAX */ + +/* + Get the next to-be-synced file from synclist which is the output of + parseRsyncList.C + The latter in turn parses the output from rsync. + In other words, we use rsync in dry-run mode to get the + files that need to be synced. + + to port to large_file environment: use off_t for size +*/ + +extern int verbose; + +char basedir[PATH_MAX]; /* baseDir for the syncing */ +FILE *fd; +struct stat st; /* stat for the current file */ + +unsigned int nEntries; +int cur_entry; /* file id -- <0 if it needs backup */ +unsigned int nPages; +unsigned int last_bytes; /* number of bytes for the last page */ +int toRmFirst = FALSE; /* flag rm existing file and then sync */ +int file_changed = FALSE; /* flag to indicate if file has been changed during syncing */ + +unsigned int total_pages; +off_t total_bytes; + +int isDelete, isHardlink; +char filename[PATH_MAX]; +char fullname[PATH_MAX]; +char linktar[PATH_MAX]; + +int init_synclist(char * synclist_path, char *bdir) +{ + char line[PATH_MAX]; + nEntries = 0; + strcpy(basedir, bdir); + + if ((fd = fopen(synclist_path, "r")) == NULL) { + fprintf(stderr, "Cannot open synclist file = %s \n", synclist_path); + return FAIL; + } + + while (fgets(line, PATH_MAX, fd) != NULL) nEntries++; + if (nEntries == 0) { + fclose(fd); + fprintf(stderr, "Empty entires in synclist file = %s\n", synclist_path); + return SUCCESS; /* OK, nothing to sync */ + } + rewind(fd); + + cur_entry = 0; + total_pages = 0; + total_bytes = 0; + + return SUCCESS; +} + +/* + pages_for_file calculates the number (int) of pages for the current file + and returns that number in an (int) type. + So, max_pages = 2**31 = 2147483648 + which corresponds to a file_size of 2**31 * 64512 = 1.38e14 + [ general limit = (1<<(sizeof(int)*8)) * PAGE_SIZE ] + At that time, the type of page_number needs to be upgraded :) + + to_delete -> -1 + normal_file -> number_of_pages + softlink -> 0 + hardlink -> 0 + directory -> 0 +*/ +int pages_for_file() +{ + if (isDelete) { /* to be deleted directory or file */ + return TO_DELETE; + } + + if (S_ISREG(st.st_mode)){ + int n; + if (st.st_nlink > 1) return 0; /* hardlink file */ + + n = (int)((st.st_size)/(off_t)PAGE_SIZE); /* regular file */ + if ((st.st_size)%((off_t)PAGE_SIZE) == 0) { + last_bytes = (unsigned int)(PAGE_SIZE); + return n; + } else { + last_bytes = (unsigned int)(st.st_size - (off_t)n * (off_t)PAGE_SIZE); + return n+1; + } + /*return ((st.st_size)%((off_t)PAGE_SIZE) == 0) ? n : n+1 ;*/ + } + if (S_ISLNK(st.st_mode)){ + return 0; /* softlink */ + } + return 0; /* directory */ +} + +off_t bytes_for_file() +{ + return st.st_size; +} + +unsigned int get_nPages() /* for this file */ +{ + return nPages; +} + +void strip(char * str) +{ + /* remove trailing \n and spaces */ + char *pc = &str[strlen(str)-1]; + while (*pc == ' ' || *pc == '\n') { + *(pc--) = '\0'; + } +} + +int same_stat_for_file() +{ + /* check if current stat is same as that when get_next_entry is called */ + struct stat st1; + + if(lstat(fullname, &st1) < 0) { + if (verbose >=1) perror(fullname); + return FAIL; + } + + if (st1.st_size != st.st_size || st1.st_mode != st.st_mode || + st1.st_mtime != st.st_mtime) { + return FAIL; /* the file has changed */ + } + return SUCCESS; + +} + +int is_hardlink_line(char * line) +{ + /* when line is in the form of + string1 string2 + it can be either a filename (string1 string2) + or a hardlink string1 => string2 + */ + struct stat st; + char fn[PATH_MAX]; + strcpy(fn, basedir); + strcat(fn, "/"); + strcat(fn, line); + + /* if the whole line is not a file, then we are dealing with hardlink case */ + return (lstat(fn, &st) < 0); + /* cannot deal with the situation + str1 is a file + str2 is a hardlink to str1 + str1 str2 is a file + AND if we need to sync + str1 and 'str1 str2' at the same time. + But this situation is very rare. */ +} + +int get_next_entry(int current_file_id) +{ + char *c; + char line[PATH_MAX]; + + /* inside this function, cur_entry is set to be positve to facilitate processing */ + cur_entry = current_file_id; /* from main loop's index, starting with 1 */ + + isDelete = FALSE; + isHardlink = FALSE; + + fgets(line, PATH_MAX, fd); + strip(line); + + if (current_file_id == nEntries) { + fclose(fd); /* close the synclist file */ + if (verbose>=2) fprintf(stderr, "no more entry in synclist.\n"); + } + + if (verbose>=2) { + fprintf(stderr, "Got current entry = %d (total= %d)\n", cur_entry, nEntries); + fprintf(stderr, "%s\n", line); + } + + strcpy(fullname, basedir); + strcat(fullname, "/"); + if (strncmp(line, "deleting ", 9)==0) { + isDelete = TRUE; + nPages = -1; + strcat(fullname, &line[9]); + strcpy(filename, &line[9]); + if (needBackup(fullname)) cur_entry = -cur_entry; + return SUCCESS; + } else if ((c = strchr(line, ' '))!=NULL && is_hardlink_line(line)) { + /* is it a hardlink -- two filenames separated by a space */ + char fn[PATH_MAX]; + isHardlink = TRUE; + strncpy(fn, line, (c - line)); + fn[c-line] = '\0'; + strcat(fullname, fn); + strcpy(filename, fn); + strcpy(linktar, c+1); + } else { + /* normal, single entry */ + strcat(fullname, line); + strcpy(filename, line); + } + + /* update stat */ + if(lstat(fullname, &st) < 0) { + if (verbose >=1) perror(fullname); + return FAIL; + } + + if (S_ISLNK(st.st_mode)) { + int linklen; + linklen = readlink(fullname, linktar, PATH_MAX); + /* readlink doesn't null-terminate the string */ + *(linktar + linklen) = '\0'; + } else if (st.st_nlink>1 && !isHardlink) { + /* this is the target file that others (hard)link to. + treat it like a normal file */ + st.st_nlink = 1; + } + + nPages = pages_for_file(); + if (nPages > 0) { /* for regular files */ + total_pages += nPages; + total_bytes += st.st_size; + } + + if (needBackup(fullname)) cur_entry = -cur_entry; + + file_changed = FALSE; + + return SUCCESS; +} + +void adjust_totals() +{ + if (nPages > 0) { + total_pages -= nPages; + total_bytes -= st.st_size; + } +} + +/* some accessors */ +unsigned int total_entries() { return nEntries; } + +int current_entry() { return cur_entry; } + +int is_softlink() { return S_ISLNK(st.st_mode); } + +int is_hardlink() { return isHardlink; } + +int is_directory() { return S_ISDIR(st.st_mode); } + +char * getFilename() { /* relative to basedir */ return &filename[0]; } + +char * getFullname() { return &fullname[0]; } + +/* + The following three fx are used to fill file_info into the send_buffer. + They return the number of bytes being written into the buf, including the \0 byte. +*/ +unsigned int fill_in_stat(char *buf) +{ + /* load into buf area the stat info in ascii format */ + if (isDelete) + sprintf(buf, "0 0 0 0 0 0 0 0"); + else { + #ifdef _LARGEFILE_SOURCE + sprintf(buf, "%u %u %u %u %llu %lu %lu %d", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, + toRmFirst); + #else + sprintf(buf, "%u %u %u %u %lu %lu %lu %d", st.st_mode, st.st_nlink, + st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, + toRmFirst); + #endif + } + + return strlen(buf)+1; +} + +unsigned int fill_in_filename(char * buf) +{ + strcpy(buf, filename); + return strlen(buf)+1; +} + +unsigned int fill_in_linktar(char *buf) +{ + strcpy(buf, linktar); + return strlen(buf)+1; +} + + @@ -0,0 +1,182 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_proto_h +#define __main_proto_h + +/* parse_synclist.c */ +unsigned int total_entries(); +unsigned int fill_in_stat(char *buf); +unsigned int fill_in_linktar(char *buf); +unsigned int fill_in_filename(char * buf); +unsigned int get_nPages(); +int pages_for_file(); +char * getFilename(); +char * getFullname(); +int same_stat_for_file(); +void strip(char * str); +int current_entry(); +int get_next_entry(int current_file_id); +int is_softlink(); +int is_directory(); +int is_hardlink(); +int init_synclist(char * synclist_path, char *bdir); +void adjust_totals(); + +/* backup.c */ +int read_backup_pattern(char * fpat_file); +int needBackup(char * filename); + +/* sends.c */ +void init_sends(); +void set_mode(int new_mode); +int send_page(int page); +void send_test(); +void send_cmd(int code, int machine_id); +void send_all_done_cmd(); +int fexist(int entry) ; +void pack_open_file_info(); +void my_exit(int); + +/* complaints.c */ +void init_complaints(); +int read_handle_complaint(int cmd); +void wait_for_ok(int code); +void refresh_machine_status(); +void refresh_missing_pages(); +void mod_machine_status(); +void refresh_file_received(); +int nNotRecv(); +int iNotRecv(); +int is_it_missing(int page); +int has_missing_pages(); +int has_sick_machines(); +void init_missing_page_flag(int n); +void free_missing_page_flag(); +void refresh_machine_status(); +void init_machine_status(int n); +void page_sent(int page); +int nBadMachines(); +void do_badMachines_exit(); +int pr_missing_pages(); +int send_done_and_pr_msgs(double, double); +void do_cntl_c(int signo); +void set_has_missing(); +void reset_has_missing(); +void set_has_sick(); +void reset_has_sick(); + + +/* setup_socket.c */ +void set_delay(int secs, int usecs); +void get_delay(int * secs, int * usecs); +int readable(int fd); +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port); +int send_socket(struct sockaddr_in *addr, char * cp, int port); +int rec_socket(struct sockaddr_in *addr, int port); +#else +int rec_socket(struct sockaddr_in6 *addr, int port); +int send_socket(struct sockaddr_in6 *addr, char * cp, int port); +int complaint_socket(struct sockaddr_in6 *addr, int port); +#endif + +/* set_mcast.c */ +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex); +int mcast_set_loop(int sockfd, int onoff); +int mcast_set_ttl(int sockfd, int val); + +/* set_catcher_mcast.c */ +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex); +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr); + +/* complaint_sender.c */ +void fill_in_int(int i); +void init_fill_ptr(); +void send_complaint(int complaint, int mid, int page, int file); +void init_complaint_sender(); +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa); +#else +void update_complaint_address(struct sockaddr_in6 *sa); +#endif + +/* page_reader.c */ +void init_page_reader(); +int check_queue(); +int read_handle_page(); + +/* file_operations.c */ +void get_tmp_suffix(); +int extract_file_info(char * buf, int n_file, unsigned int n_pages); +int open_file(); +int close_file(); +int rm_tmp_file(); +int delete_file(int to_check_dir_type); +int touch_file(); +int nPages_for_file(); +int has_all_pages(); +int ask_for_missing_page(); +void missing_page_stat(); +void write_page(int page, char* data_ptr, int bytes); +int is_missing(int page); +void page_received(int page); +int set_owner_perm_times(); +void close_last_file(); +int check_zero_page_entry(); +void default_suffix(); + +/* timing */ +void refresh_timer(); +double get_accumulated_time(); +void start_timer(); +void end_timer(); +void update_time_accumulator(); +double get_accumulated_usec(); +void update_rtt_hist(unsigned int rtt); +void pr_rtt_hist(); +void init_rtt_hist(); +unsigned int pages_wo_ack(); + +/* signal.c */ +typedef void Sigfunc(int); /* for signal handlers */ +Sigfunc * Signal(int signo, Sigfunc *func); +int Fcntl(int fd, int cmd, int arg); +int Ioctl(int fd, int request, void *arg); +void Sigemptyset(sigset_t *set); +void Sigaddset(sigset_t *set, int signo); +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset); + +/* id_map.c */ +void get_machine_names(char * filename); +char * id2name(int id); + +#endif @@ -0,0 +1,258 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +/* + To measure round trip time (RTT) using UDP +*/ + +#include "rttmain.h" +#include <limits.h> /* to define PATH_MAX */ +#include <libgen.h> + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +int my_TTL = MCAST_TTL; +int my_LOOP = MCAST_LOOP; +char * my_IFname = MCAST_IF; + +int no_feedback_count; +char * machine = NULL; +int remote_pid; +char * reshell = REMOTE_SHELL; +char catcher_path[PATH_MAX]; + +void usage() +{ + fprintf(stderr, + "rtt (to measure rount trip time to a target version %s\n" + " Option list:\n" + " [ -v flag to turn on verbose]\n" + " [ -r <remote shell, default=%s> ]\n" + " [ -p <PATH for remote rttcatcher, default=%s> ]\n" + " -------- essential options ----------------------------------\n" + " -m <destination machine_name>\n" + " -n <num Of Pages>\n" + " -s <page_size in bytes; max=64512>\n" + " -------- mcast options --------------------------------------\n" + " [ -A <my_mcast_address default=%s)> ]\n" + " [ -P <my_PORT default=%d> ]\n" + " [ -T <my_TTL default=%d> ]\n" + " [ -L flag turn on mcast_LOOP. default is off ]\n" + " [ -I <my_MCAST_IF default=NULL> ]\n", + VERSION, reshell, catcher_path, my_MCAST_ADDR, my_PORT, my_TTL); +} + +void get_dirname_of_rtt(char *dname, char *rtt_path) +{ + char path[PATH_MAX]; + strcpy(path, rtt_path); /* dirname() will change its argument */ + strcpy(dname, dirname(path)); + + if (strcmp(dname, ".")==0) { + strcpy(dname, getcwd(path, PATH_MAX)); + } +} + +void do_one_page(int page) +{ + unsigned long rtt; + refresh_timer(); + start_timer(); + send_page(page); + /* read_handle_complaint() waits n*interpage_interval at most */ + if (read_handle_complaint()==0) { /* delay_sec for readable() is set by set_delay() */ + /* + At this point, the readable() returns without getting a reply + from the target after n*DT_PERPAGE + This indicates that the page has likely been lost in the network. + */ + if (verbose) fprintf(stderr, "no ack for page = %d\n", page); + ++no_feedback_count; + update_rtt_hist(999999); /* register this as rtt = infinite --- the last element in rtt_hist */ + if (no_feedback_count>NO_FEEDBACK_COUNT_MAX) { /* count the consecutive no_feedback event */ + /* switch to another client */ + fprintf(stderr, "Consecutive non_feedback exceeds limit. Continue with next page...\n"); + no_feedback_count = 0; + } + } else { + end_timer(); + update_time_accumulator(); + rtt = get_accumulated_usec(); + /* to do: wait additional time after receiving feedback: usleep( rtt * 0.1 ); */ + /* to do: update histogram */ + if (verbose>=2) printf("rtt(p = %d) = %ld (usec)\n", page, rtt); + update_rtt_hist(rtt); + + no_feedback_count = 0; + } +} + +int invoke_catcher(char * machine) +{ + FILE *ptr; + char buf[PATH_MAX]; + + /* invoke rttcatcher on remote machine */ + fprintf(stderr, "using %s to invoke rttcatcher on %s\n", reshell, machine); + + /* check if rsh (ssh) works */ + sprintf(buf, "%s %s date", reshell, machine); + if (verbose) fprintf(stderr, "%s\n", buf); + if (system(buf)) { + fprintf(stderr, "cannot do rsh to the target machine = %s\n", machine); + exit(BAD_EXIT); + } + + if (!my_IFname) { + sprintf(buf, + "%s %s '%s/rttcatcher -A %s -P %d < /dev/null 1>/dev/null 2>/dev/null & echo $!'", + reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT); + } else { + sprintf(buf, + "%s %s '%s/rttcatcher -A %s -P %d -I %s < /dev/null 1>/dev/null 2>/dev/null & echo $!'", + reshell, machine, catcher_path, my_MCAST_ADDR, my_PORT, my_IFname); + } + + + fprintf(stderr, "%s\n", buf); + if ((ptr = popen(buf, "r")) == NULL) { + fprintf(stderr, "Failure to invoke rttcather\n"); + exit(-1); + } + fgets(buf, PATH_MAX, ptr); + pclose(ptr); + + return atoi(buf); +} + +int main(int argc, char *argv[]) +{ + int c; + int nPages =-1, pageSize=-1, ipage; + + verbose = 0; + catcher_path[0] = '\0'; + + while ((c = getopt(argc, argv, "vm:s:n:r:p:A:P:T:LI:")) != EOF) { + switch (c) { + case 'v': + verbose = 1; + break; + case 'r': + reshell = optarg; + break; + case 'p': + strcpy(catcher_path, optarg); + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'T': + my_TTL = atoi(optarg); + break; + case 'L': + my_LOOP = TRUE; + break; + case 'I': + my_IFname = optarg; + break; + case 'n' : + nPages = atoi(optarg); + break; + case 'm': + machine = optarg; + break; + case 's': + pageSize = atoi(optarg); + if (pageSize>MAX_PAGE_SIZE) { + usage(); + exit(-1); + } + break; + case '?': + usage(); + exit(-1); + } + } + + if (strlen(catcher_path)==0) { + get_dirname_of_rtt(catcher_path, argv[0]); + } + + + if (nPages == -1 || pageSize == -1 || machine == NULL) { + fprintf(stderr, "Essential options (-n -m -s) should be specified. \n"); + usage(); + exit(-1); + } + + /* init */ + init_sends(pageSize); + init_complaints(); + + /* set up Cntl_C catcher */ + Signal(SIGINT, do_cntl_c); + + remote_pid = invoke_catcher(machine); + fprintf(stderr, "remote pid = %d\n", remote_pid); + + sleep(1); + + /* -------------------Send data-------------------------------------- */ + + init_missing_page_flag(nPages); + + send_cmd(START_CMD, nPages); + refresh_machine_status(); + wait_for_ok(START_CMD); + do_badMachines_exit(machine, remote_pid); + + fprintf(stderr, "Sending data...\n"); + /* send pages */ + set_mode(SENDING_DATA); + set_delay(0, DT_PERPAGE*FACTOR); + + no_feedback_count = 0; + for (ipage = 0; ipage < nPages; ipage++){ + do_one_page(ipage); + } + + send_cmd(EOF_CMD, 0); + refresh_machine_status(); + wait_for_ok(EOF_CMD); + do_badMachines_exit("", -1); + + /* -----------------end of send data -------------------------------- */ + + free_missing_page_flag(); + send_done_and_pr_msgs(); + return 0; +} + diff --git a/rttcatcher.c b/rttcatcher.c new file mode 100644 index 0000000..1af74f1 --- /dev/null +++ b/rttcatcher.c @@ -0,0 +1,118 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Codes in this file are extracted and modified from multicatcher.c. + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +char * my_MCAST_ADDR = MCAST_ADDR; +int my_FLOW_PORT = FLOW_PORT; +int my_PORT = PORT; +char * my_IFname = MCAST_IF; + +void usage() +{ + fprintf(stderr, + "rttcatcher (to receive pages on the target. version %s\n" + " Option list:\n" + " [ -v flag to turn on verbose]\n" + " -------- mcast options --------------------------------------\n" + " [ -A <my_mcast_address default=%s)> ]\n" + " [ -P <my_PORT default=%d> ]\n" + " [ -I <my_MCAST_IF default=NULL> ]\n", + VERSION, MCAST_ADDR, PORT); +} + +int main(int argc, char *argv[]) +{ + int old_mode; /* hp: from char to int for mode */ + int mode; + int c; + + verbose = 0; + while ((c = getopt(argc, argv, "vA:P:I:")) != EOF) { + switch (c) { + case 'v': + verbose = 1; + break; + case 'A': + my_MCAST_ADDR = optarg; + break; + case 'P': + my_PORT = atoi(optarg); + my_FLOW_PORT = my_PORT -1; + break; + case 'I': + my_IFname = optarg; + break; + case '?': + usage(); + exit(-1); + } + } + + init_page_reader(); + init_complaint_sender(); + + /* initialize random numbers */ + srand(time(NULL) + getpid()); + + /* Wait forever if necessary for first packet */ + set_delay(0, -1); + mode = old_mode = TEST; /* hp: add mode */ + + while(1) { /* loop for all incoming pages */ + if (verbose) + fprintf(stderr, "Starting listen loop with mode %d\n", mode); + + mode = read_handle_page(); + if (verbose) fprintf(stderr, "in mode %d\n", mode); + + if (mode == ALL_DONE_CMD) break; + + /* got no data? */ + if (mode == TIMED_OUT) { + if (verbose) fprintf(stderr, "*"); + continue; + } /* end if TIMED_OUT */ + + /* changing modes? */ + if ((old_mode != SENDING_DATA) && (mode == SENDING_DATA)){ + /* Taking data, wait at least 3 to 8 seconds */ + set_delay( 3 + rand() % 8, 200); + if (verbose) fprintf(stderr, "Receiving data\n"); + old_mode = mode; + continue; + } + + /* all other modes */ + old_mode = mode; + + } /* end of incoming page loop */ + + if (verbose) fprintf(stderr, "Done!\n"); + return 0; +} + + diff --git a/rttcomplaint_sender.c b/rttcomplaint_sender.c new file mode 100644 index 0000000..20d7cda --- /dev/null +++ b/rttcomplaint_sender.c @@ -0,0 +1,103 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Codes in this file are extracted and modified from complaint_sender.c + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +/* send socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; + +/* send buffer */ +char complaint_buffer[FLOW_BUFFSIZE]; +int *ccode_ptr; /* change from char to int -- mem alignment */ +int *cpage_ptr; + +/*---------------------------------------------------------- + init_complaint_sender initializes the buffer to allow the + catcher to send complaints back to the sender. + + ret_address of sender to whom we will complain + is determined when we receive the first UDP data + in read_handle_page() in page_reader.c + ----------------------------------------------------------*/ +void init_complaint_sender() +{ + if (verbose) + fprintf(stderr, "in init_complaint_sender\n"); + + /* init the send_socket */ + complaint_fd = complaint_socket(&complaint_addr, my_FLOW_PORT); + + ccode_ptr = (int *) complaint_buffer; + cpage_ptr = (int *)(ccode_ptr + 1); +} + +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin_addr); +} +#else +void update_complaint_address(struct sockaddr_in6 *sa) +{ + sock_set_addr((struct sockaddr *) &complaint_addr, + sizeof(complaint_addr), (void*)&sa->sin6_addr); +} +#endif + +/*------------------------------------------------------------------------ + send_complaint fills the complaint buffer and send it through our socket + back to the sender + + The major use is to tell master machine which page of which file + needs to be re-transmitted. + complaint -- the complain code defined in main.h + file -- the file index + page -- page index + ------------------------------------------------------------------------*/ +void send_complaint(int complaint, int page) +{ + /* fill in the complaint data */ + /* 20060323 add converting to network byte-order before sending out */ + *ccode_ptr = htonl(complaint); + *cpage_ptr = htonl(page); + + /* send it */ + if( sendto(complaint_fd, complaint_buffer, FLOW_BUFFSIZE, 0, + (const struct sockaddr *)&complaint_addr, + sizeof(complaint_addr)) < 0) { + perror("Sending complaint\n"); + } + if (verbose) + printf("Sent complaint:code=%d page=%d\n", complaint, page); +} diff --git a/rttcomplaints.c b/rttcomplaints.c new file mode 100644 index 0000000..4f9ed9b --- /dev/null +++ b/rttcomplaints.c @@ -0,0 +1,270 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + codes in this file are extracted and modified from complaints.c + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" +#include <sys/times.h> + +/* buffer for receiving complaints */ + +char flow_buff[FLOW_BUFFSIZE]; +int *code_ptr; /* What's wrong? */ +int *page_ptr; /* Which page */ + +/* receive socket */ +int complaint_fd; +#ifndef IPV6 +struct sockaddr_in complaint_addr; +#else +struct sockaddr_in6 complaint_addr; +#endif + +extern int my_FLOW_PORT; + +/* status */ +char *missing_page_flag=NULL; /* arrary of size nPages -- dep on the files */ +int total_missing_page = 0; +char machine_status = NOT_READY; +int nMachines = 1; +int nPages; +char *machine_list_file; + +extern char * machine; +extern int remote_pid; +extern char * reshell; + +/* + init_complaints initializes our buffers to receive complaint information + from the catchers +*/ +void init_complaints () +{ + if (verbose) + fprintf(stderr, "in init_complaints with FLOW_BUFFSIZE = %d\n", FLOW_BUFFSIZE); + + /* Buffer */ + code_ptr = (int *)flow_buff; + page_ptr = (int *)(code_ptr + 1); + + /* Receive socket */ + if (verbose) printf("set up receive socket for complaints\n"); + complaint_fd = rec_socket(&complaint_addr, my_FLOW_PORT); +} + +void init_missing_page_flag(int n) +{ + int i; + nPages = n; + if ((missing_page_flag = malloc(n * sizeof(char)))==NULL) { + fprintf(stderr, "Cannot malloc(%d * sizeof(char))\n", n); + perror("error = "); + exit(-1); + } + for(i=0; i<nPages; ++i) { + missing_page_flag[i] = RECEIVED; + } +} + +void page_sent(int page) +{ + missing_page_flag[page] = RECEIVED; +} + +void free_missing_page_flag() +{ + free(missing_page_flag); + missing_page_flag = NULL; +} + + +void refresh_machine_status() +{ + machine_status = NOT_READY; +} + +int get_total_missing_pages() +{ + return total_missing_page; +} + +int read_handle_complaint() +{ + /* + return 1 for receiving complaint + return 0 for no complaint handled + */ + int code_v, page_v, bytes_read; + + if (readable(complaint_fd)) { + + /* There is a complaint */ + bytes_read = recvfrom(complaint_fd, flow_buff, FLOW_BUFFSIZE, 0, NULL, NULL); + + /* 20060323 deal with big- vs little-endian issue + convert incoming integers into host representation */ + + if (bytes_read != FLOW_BUFFSIZE) return 0; + + code_v = ntohl(*code_ptr); + page_v = ntohl(*page_ptr); + + switch (code_v) { + case PAGE_RECV: + return 1; + case START_OK : + case EOF_OK : + machine_status = MACHINE_OK; + return 1; + case MISSING_PAGE : + if (page_v > nPages) return 1; /* *page_ptr = page # (1 origin) */ + ++(total_missing_page); + missing_page_flag[(page_v)-1] = MISSING; + return 1; + case LAST_MISSING : + if (page_v > nPages) return 1; + ++(total_missing_page); + missing_page_flag[page_v-1] = MISSING; + machine_status = MACHINE_OK; + return 1; + default : + printf("Unknown complaint: %d\n", code_v); + return 0; + } /* end of switch */ + } /* end of if(readable) */ + + return 0; +} + +int all_machine_ok() +{ + return (machine_status == NOT_READY ) ? 0 : 1; +} + +void wait_for_ok(int code) +{ + int i, count; + time_t tloc; + time_t rtime0, rtime1; + + rtime0 = time(&tloc); /* reference time */ + + count = 0; + while (!all_machine_ok()) { + if (read_handle_complaint()==1) { /* if there is a complaint handled */ + rtime0 = time(&tloc); /* reset the reference time */ + continue; + } + /* no complaints handled */ + rtime1 = time(&tloc); /* time since last complaints */ + if ((rtime1-rtime0) >= ACK_WAIT_PERIOD) { + ++count; + if (count < ACK_WAIT_TIMES) { + fprintf(stderr, "%d: resend cmd(%d) to machines:[ ", count, code); + for(i=0; i<nMachines; ++i) { + if (machine_status == NOT_READY) { + fprintf(stderr, "%d ", i); + send_cmd(code, (int) i); + usleep(FAST); + } + } + fprintf(stderr, "]\n"); + rtime0 = rtime1; + } else { + fprintf(stderr, "Time out for the 'bad' machines:[ "); + for(i=0; i<nMachines; ++i) { + if (machine_status== NOT_READY) { + fprintf(stderr, "%d ", i); + } + } + fprintf(stderr, "]\n"); + break; + } + } + } +} + +int is_it_missing(int page) +{ + return (missing_page_flag[page]==MISSING) ? 1 : 0; +} + +int has_missing_pages() +{ + int i; + for(i=0; i<nPages; ++i) { + if (missing_page_flag[i] == MISSING) { + return 1; + } + } + return 0; +} + +void pr_missing_pages() +{ + int N; + + N = get_total_missing_pages(); + fprintf(stderr, "Missing pages = %10d (%5.2f%%) out of total pages = %d\n", + N, (double)N/((double)nPages)*100.0, nPages); +} + +void send_done_and_pr_msgs() +{ + send_all_done_cmd(); + pr_missing_pages(); + pr_rtt_hist(); +} + +void kill_pid() +{ + char cmd[100]; + /* kill remote process in case where remote machine is not in the multicast network */ + sprintf(cmd, "%s %s 'kill -9 %d'", reshell, machine, remote_pid); + fprintf(stderr, "To make sure we clean up the process on remote machine,\n"); + fprintf(stderr, "%s\n", cmd); + system(cmd); +} + +void do_cntl_c(int signo) +{ + fprintf(stderr, "Control_C interrupt detected!\n"); + send_done_and_pr_msgs(); + kill_pid(); + exit(-1); +} + +/* to do some cleanup before exit IF all machines are bad */ +void do_badMachines_exit(char* machine, int pid) +{ + if (machine_status != NOT_READY) return; + fprintf(stderr, "Remote machine is bad. Exit!\n"); + send_done_and_pr_msgs(); + + if (pid > 0) { + kill_pid(); + } + + exit(-1); +} diff --git a/rttmain.h b/rttmain.h new file mode 100644 index 0000000..78eb693 --- /dev/null +++ b/rttmain.h @@ -0,0 +1,126 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __main_h +#define __main_h + +#include <time.h> +#include <utime.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <stdio.h> +#include <stdlib.h> +#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */ +#include <arpa/inet.h> /* inet(3) functions */ +#include <errno.h> +#include <fcntl.h> /* for nonblocking */ +#include <sys/ioctl.h> +#include <netdb.h> +#include <signal.h> +#include <stdio.h> +#include <string.h> +#include <sys/stat.h> /* for S_xxx file mode constants */ +#include <sys/uio.h> /* for iovec{} and readv/writev */ +#include <unistd.h> +#include <sys/wait.h> +#include <sys/time.h> /* timeval{} for select() */ + +#define VERSION "3.1.0" + +/* logic values */ +#define FALSE 0 +#define TRUE 1 +#define FAIL (FALSE) +#define SUCCESS (TRUE) +#define GOOD_EXIT 0 +#define BAD_EXIT -1 + +/* Ports and addresses */ +#define PORT 7900 /* for multicast */ +#define FLOW_PORT (PORT-1) /* for flow-control */ +#define MCAST_ADDR "239.255.67.200" +#define MCAST_TTL 1 +#define MCAST_LOOP FALSE +#define MCAST_IF NULL + +#define REMOTE_SHELL "rsh" + +#define NO_FEEDBACK_COUNT_MAX 5 +#define USEC_TO_IDLE 1000000 + +/* Speed stuff */ +#define FAST 100 /* usec */ +#define DT_PERPAGE 8000 /* usec time interval between pages */ +#define FACTOR 50 + +/* time for the master to wait for the acknowledgement */ +#define ACK_WAIT_PERIOD 1 /* secs (from time()); */ +#define ACK_WAIT_TIMES 60 /* wait for this many periods */ + +/* complaints */ +#define TOO_FAST 100 +#define SEND_AGAIN 200 +#define START_OK 300 +#define MISSING_PAGE 500 +#define LAST_MISSING 600 +#define EOF_OK 700 +#define PAGE_RECV 800 + +#define FLOW_BUFFSIZE (2 * sizeof(int)) + +#define PAGE_SIZE 64512 /* max page_size allowed */ +#define HEAD_SIZE (3 * sizeof(int)) +#define PAGE_BUFFSIZE (PAGE_SIZE + HEAD_SIZE) +#define TOTAL_REC_PAGE 20 /* 31 20 */ + +/* Modes */ +#define TIMED_OUT 0 +#define TEST 1 +#define SENDING_DATA 2 +#define RESENDING_DATA 3 +#define START_CMD 4 +#define EOF_CMD 5 +#define ALL_DONE_CMD 6 +#define NULL_CMD 7 + +/* machine status */ +#define MACHINE_OK '\1' +#define NOT_READY '\0' + +/* PAGE STATUS */ +#define MISSING '\0' +#define RECEIVED '\1' + +/* MACHINE STATE */ +#define IDLE_STATE 0 +#define GET_DATA_STATE 1 +#define DATA_READY_STATE 2 + +#define MAX_PAGE_SIZE 64512 + +int verbose; + +#include "rttproto.h" + +#endif diff --git a/rttmissings.c b/rttmissings.c new file mode 100644 index 0000000..774e8b2 --- /dev/null +++ b/rttmissings.c @@ -0,0 +1,93 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +char * missingPages = NULL; /* array of flags */ +int nPages; + +int init_missingPages(int n) +{ + int i; + + nPages = n; + + /* init missingPages flags */ + if (missingPages != NULL) free(missingPages); /* for second round */ + missingPages = malloc(sizeof(char) * nPages); + for(i=0; i < nPages; ++i) + missingPages[i] = MISSING; + + return 0; +} + +int get_total_pages() +{ + return nPages; +} + +int missing_pages() +{ + int result; + int i; + + result = 0; + + for(i=0; i < nPages; ++i) + if ((missingPages[i]) == MISSING) ++result; + return result; +} + +int is_missing(int page) +{ + return (missingPages[page] == MISSING) ? 1 : 0; +} + +void page_received(int page) +{ + missingPages[page] = RECEIVED; +} + +int ask_for_missing_page() +{ + int i; + int n, count; + + n = missing_pages(); + if (n == 0) { + /* send_complaint(EOF_OK, machineID, 0); */ + return 0; /* nothing is missing */ + } + + count = 0; + for(i=0; i < nPages; ++i) { + if ( missingPages[i] == MISSING ) { + ++count; + send_complaint((count==n) ? LAST_MISSING : MISSING_PAGE, i+1); + usleep(DT_PERPAGE); + } + } + + return 1; /* there is something missing */ +} + diff --git a/rttpage_reader.c b/rttpage_reader.c new file mode 100644 index 0000000..35940e5 --- /dev/null +++ b/rttpage_reader.c @@ -0,0 +1,188 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Codes in this file are extraced and modified from page_reader.c + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" + +/* the following is needed on Sun but not on linux */ +#ifdef _SUN +#include <sys/filio.h> +#endif + +extern char * my_MCAST_ADDR; /* defined in rttcatcher.c */ +extern int my_PORT; +extern char * my_IFname; +int machineState; +int isFirstPage = TRUE; + +int recfd; +#ifndef IPV6 +struct sockaddr_in rec_addr; +#else +struct sockaddr_in6 rec_addr; +#endif + +int *mode_ptr; /* change from char to int */ +int *total_bytes_ptr; +int *current_page_ptr; +char *data_ptr; +char rec_buf[PAGE_BUFFSIZE]; + +void init_page_reader() +{ + struct ip_mreq mreq; + int rcv_size; + + machineState = IDLE_STATE; + + /* Prepare buffer */ + mode_ptr = (int*)rec_buf; /* hp: add the cast (int *) */ + current_page_ptr = (int *)(mode_ptr + 1); + total_bytes_ptr = (int *)(current_page_ptr + 1); + data_ptr = (char *)(total_bytes_ptr + 1); + + /* Set up receive socket */ + if (verbose) fprintf(stderr, "setting up receive socket\n"); + recfd = rec_socket(&rec_addr, my_PORT); + + /* Join the multicast group */ + if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) { + perror("Joining Multicast Group"); + } + + /* Increase socket receive buffer */ + rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE; + if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){ + perror("Expanding receive buffer"); + } +} + + +/* + This is the heart of catcher. + It parses the incoming pages and do proper reactions according + to the mode (command code) encoded in the first 4 bytes in an UDP page. + It returns the mode. + + Note: since rtt is intended to deal with one-to-one machine, + the four-state engine as in page_reader is not used. +*/ +int read_handle_page() +{ + #ifndef IPV6 + struct sockaddr_in return_addr; + #else + struct sockaddr_in6 return_addr; + #endif + + int bytes_read; + socklen_t return_len = (socklen_t)sizeof(return_addr); + int mode_v, total_bytes_v, current_page_v; + + /* -----------receiving data -----------------*/ + if (readable(recfd) == 1) { /* there is data coming in */ + /* get data */ + bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0, + (struct sockaddr *)&return_addr, + (socklen_t*) &return_len); + + total_bytes_v = ntohl(*total_bytes_ptr); + if (bytes_read != total_bytes_v) return NULL_CMD; + + /* convert from network byte order to host byte order */ + mode_v = ntohl(*mode_ptr); + current_page_v = ntohl(*current_page_ptr); + + if (isFirstPage) { + update_complaint_address(&return_addr); + isFirstPage = FALSE; + } + + /* get init wish list and return address first time only + if (firstTime){ + if (verbose) + fprintf(stderr, "Initializing complaint_sender and wish_list\n"); + init_complaint_sender(&return_addr); + firstTime = 0; + } + */ + + /* --- process various commands */ + switch (mode_v) { + case TEST: + /* It is just a test packet? */ + fprintf(stderr, "********** Received test packet **********\n"); + return mode_v; + + case START_CMD: + if (verbose) + fprintf(stderr, "Start cmd received ---\n"); + init_missingPages(current_page_v); /* Here: use current_page for total_pages */ + send_complaint(START_OK, 0); + return mode_v; + + case EOF_CMD: + if (verbose) + fprintf(stderr, "Check and ask for missing pages ---\n"); + + if (ask_for_missing_page()==0) { + /* + There is no missing page. + */ + send_complaint(EOF_OK, 0); + } + /* + else + There are missing pages. + Ack has been done in ask_for_missing_page() + */ + + return mode_v; + + case SENDING_DATA: + case RESENDING_DATA: + if (verbose){ + fprintf(stderr, "Got %d bytes from page %d of %d, mode=%d\n", + bytes_read - HEAD_SIZE, + current_page_v, get_total_pages(), mode_v); + } + + /* mode == SENDING_DATA, RESENDING_DATA */ + page_received(current_page_v); + send_complaint(PAGE_RECV, 0); + + /* Yes, we read a page */ + return mode_v; + case ALL_DONE_CMD: /* this is presumably a good machine */ + default: + return mode_v; + } /* end of switch */ + } else { + /* No, the read timed out */ + return TIMED_OUT; + } +} + + diff --git a/rttproto.h b/rttproto.h new file mode 100644 index 0000000..6d4cf62 --- /dev/null +++ b/rttproto.h @@ -0,0 +1,116 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef __rttproto_h +#define __rttproto_h + +/* rttsends */ +void init_sends(int n); +void set_mode(int new_mode); +int send_page(int page); +void send_cmd(int code, int machine_id); +void send_all_done_cmd(); + +/* rttcomplaints */ +void init_complaints(); +int read_handle_complaint(); +void wait_for_ok(int code); +void refresh_machine_status(); +int is_it_missing(int page); +int has_missing_pages(); +void init_missing_page_flag(int n); +void free_missing_page_flag(); +void refresh_machine_status(); +int get_total_missing_pages(); +void page_sent(int page); +void pr_missing_pages(); +void do_cntl_c(int signo); +void send_done_and_pr_msgs(); +void do_badMachines_exit(char * machine, int pid); + +/* setup_socket.c */ +void set_delay(int secs, int usecs); +int readable(int fd); +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port); +int send_socket(struct sockaddr_in *addr, char * cp, int port); +int rec_socket(struct sockaddr_in *addr, int port); +#else +int rec_socket(struct sockaddr_in6 *addr, int port); +int send_socket(struct sockaddr_in6 *addr, char * cp, int port); +int complaint_socket(struct sockaddr_in6 *addr, int port); +#endif + +/* set_mcast.c */ +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex); +int mcast_set_loop(int sockfd, int onoff); +int mcast_set_ttl(int sockfd, int val); + +/* set_catcher_mcast.c */ +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex); +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr); + +/* rttcomplaint_sender */ +void send_complaint(int complaint, int page); +void init_complaint_sender(); +#ifndef IPV6 +void update_complaint_address(struct sockaddr_in *sa); +#else +void update_complaint_address(struct sockaddr_in6 *sa); +#endif + +/* rttpage_reader */ +void init_page_reader(); +int read_handle_page(); + +/* rttmissings */ +int init_missingPages(int n); +int missing_pages(); +int is_missing(int page); +void page_received(int page); +int ask_for_missing_page(); +int get_total_pages(); + +/* timing */ +void refresh_timer(); +double get_accumulated_time(); +void start_timer(); +void end_timer(); +void update_time_accumulator(); +double get_accumulated_usec(); +void update_rtt_hist(unsigned int rtt); +void pr_rtt_hist(); +void init_rtt_hist(); + +/* signal.c */ +typedef void Sigfunc(int); /* for signal handlers */ +Sigfunc * Signal(int signo, Sigfunc *func); +int Fcntl(int fd, int cmd, int arg); +int Ioctl(int fd, int request, void *arg); +void Sigemptyset(sigset_t *set); +void Sigaddset(sigset_t *set, int signo); +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset); + +#endif diff --git a/rttsends.c b/rttsends.c new file mode 100644 index 0000000..7038e2c --- /dev/null +++ b/rttsends.c @@ -0,0 +1,144 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + codes in this file are extracted and modified from sends.c + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "rttmain.h" +#include <net/if.h> +#ifdef _SUN +#include <sys/sockio.h> /* define SIOCGIFADDR */ +#else +#include <linux/sockios.h> +#endif + +extern char * my_MCAST_ADDR; +extern int my_PORT; +extern int my_TTL; +extern int my_LOOP; +extern char * my_IFname; + +/* buffer for sending (same structure as those in sends.c */ +int *mode_ptr; /* char type would cause alignment problem on Sparc */ +int *current_page_ptr; +int *total_bytes_ptr; +char *data_ptr; +char send_buff[PAGE_BUFFSIZE]; + +int pageSize; + +/* Send socket */ +int send_fd; +#ifndef IPV6 +struct sockaddr_in send_addr; +#else +struct sockaddr_in6 send_addr; +#endif + +/* + set_mode sets the caster into a new mode. + modes are defined in main.h: +*/ +void set_mode(int new_mode) +{ + *mode_ptr = htonl(new_mode); +} + + +/* init_sends initializes the send buffer */ +void init_sends(int npagesize) +{ + pageSize = (npagesize>PAGE_SIZE) ? PAGE_SIZE : npagesize; + + mode_ptr = (int *)send_buff; /* hp: add (int*) */ + current_page_ptr = (int *) (mode_ptr + 1); + total_bytes_ptr = (int *)(current_page_ptr + 1); + data_ptr = (char *)(total_bytes_ptr + 1); + + send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT); + + /******* change MULTICAST_IF ********/ + if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0) + perror("init_sends(): when set MULTICAST_IF\n"); + + /* set multicast_ttl such that UDP can go to 2nd subnetwork */ + if (mcast_set_ttl(send_fd, my_TTL) < 0) + perror("init_sends(): when set MULTICAST_TTL\n"); + + /* disable multicast_loop such that there is no echo back on master */ + if (mcast_set_loop(send_fd, my_LOOP) < 0) + perror("init_sends(): when set MULTICAST_LOOP\n"); + + /* put dummy contents into send (UDP) buffer */ + memset(data_ptr, 1, PAGE_SIZE); +} + +/* + send_buffer will send the buffer with the file information + out to the socket connection with the catcher. +*/ +int send_buffer(int bytes_read) +{ + /* Else send the data */ + if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE, + 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) { + perror("Sending packet"); + exit(1); + } + return (1); +} + +/* + send_page takes a page from the current file and controls + sending it out the socket to the catcher. It calls send_buffer + to do the actuall call to sendto. +*/ +int send_page(int page) +{ + if (verbose>=2) fprintf(stderr, "in send_page\n"); + *total_bytes_ptr = htonl(pageSize+HEAD_SIZE); + *current_page_ptr = htonl(page); + + return send_buffer(pageSize); +} + + +void send_cmd(int code, int pages) +{ + *mode_ptr = htonl(code); + *current_page_ptr = htonl(pages); + *total_bytes_ptr = htonl(HEAD_SIZE); + + send_buffer(0); +} + + +void send_all_done_cmd() +{ + *mode_ptr = htonl(ALL_DONE_CMD); + *current_page_ptr = 0; + *total_bytes_ptr = htonl(HEAD_SIZE) ; + + send_buffer(0); + if (verbose) fprintf(stderr, "(ALL DONE)\n"); +} @@ -0,0 +1,329 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Following the suggestion by Robert Dack <robd@kelman.com>, + I added the option to change the default IP address for multicasting + and the PORT for flow control. See mrsync.py for the new options. + + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include <net/if.h> +#ifdef _SUN +#include <sys/sockio.h> /* define SIOCGIFADDR */ +#else +#include <linux/sockios.h> +#endif + +extern char * my_MCAST_ADDR; /* defined in multicaster.c */ +extern int my_PORT; /* ditto */ +extern int my_TTL; +extern int my_LOOP; +extern char * my_IFname; /* defined in multicaster.c */ +extern int verbose; + +extern unsigned int nPages; +extern unsigned int last_bytes; /* the number of bytes in the last page of a file */ +extern int cur_entry; +extern int file_changed; +extern char* cmd_name[]; + +/* Where have we last sent from? */ +int most_recent_file; +int most_recent_fd; + +/* + Send buffer for storing the data and to be transmitted thru UDP + The format: + (5*sizeof(int) bytes header) + (PAGE_SIZE data area) + + The header has five int_type (4 bytes) int's. + (1) mode -- for master to give instructions to the target machines. + (2) current file index (starting with 1) + (3) current page index (starting with 1) -- gee! + (4) bytes to be sent in this page via UPD + (5) total number of pages for this file + + data_ptr points to the data area. +*/ +int *mode_ptr; /* char type would cause alignment problem on Sparc */ +int *current_file_ptr; +int *current_page_ptr; +int *bytes_sent_ptr; +int *total_pages_ptr; +char *data_ptr; +char *fill_here; +char send_buff[PAGE_BUFFSIZE]; + +/* Send socket */ +int send_fd; +#ifndef IPV6 +struct sockaddr_in send_addr; +#else +struct sockaddr_in6 send_addr; +#endif + +/* for final statistics */ +extern unsigned int total_pages; +extern off_t total_bytes; +off_t real_total_bytes; +unsigned int real_total_pages; + +/* + set_mode sets the caster into a new mode. + modes are defined in main.h: +*/ +void set_mode(int new_mode) +{ + /* 20060323 convert it to network byte order */ + *mode_ptr = htonl(new_mode); +} + +/* init_sends initializes the send buffer */ +void init_sends() +{ + most_recent_file = most_recent_fd = -99999; + real_total_bytes = 0; + real_total_pages = 0; + + /* pointers for send buffer */ + mode_ptr = (int *)send_buff; + current_file_ptr = (int *)(mode_ptr+1); + current_page_ptr = (int *)(current_file_ptr + 1); + bytes_sent_ptr = (int *)(current_page_ptr + 1); + total_pages_ptr = (int *)(bytes_sent_ptr + 1); + data_ptr = (char *)(total_pages_ptr + 1); + fill_here = data_ptr; + + /* send socket */ + send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT); + + /******* change MULTICAST_IF ********/ + if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0) + perror("init_sends(): when set MULTICAST_IF\n"); + + /* set multicast_ttl such that UDP can go to 2nd subnetwork */ + if (mcast_set_ttl(send_fd, my_TTL) < 0) + perror("init_sends(): when set MULTICAST_TTL\n"); + + /* disable multicast_loop such that there is no echo back on master */ + if (mcast_set_loop(send_fd, my_LOOP) < 0) + perror("init_sends(): when set MULTICAST_LOOP\n"); +} + +void clear_send_buf() +{ + fill_here = data_ptr; +} + +/* + put file contents into send (UDP) buffer + return the number of bytes put into the buffer. +*/ +ssize_t pack_page_for_file(int page) +{ + if(verbose>=2) + fprintf(stderr, "Sending page %d of file %d\n", page, current_entry()); + + /*** + Adjust the position for reading + NOTE:if the type (off_t) is not given, the large file operation + would fail. + ***/ + lseek(most_recent_fd, (off_t)PAGE_SIZE * (off_t)(page - 1), SEEK_SET); + + /* read it and put the content into send_buff */ + /* the max number this read() will return is PAGE_SIZE */ + return read(most_recent_fd, data_ptr, PAGE_SIZE); +} + +int fexist(int entry) +{ + if (entry != most_recent_file) { + if (most_recent_fd > 0) close(most_recent_fd); /* make sure we close it */ + + if((most_recent_fd = open(getFullname(), O_RDONLY, 0)) < 0){ + perror(getFullname()); + } + most_recent_file = entry; + } + return (most_recent_fd >= 0); /* <0 means FAIL */ +} + + +/* + send_buffer will send the buffer with the file information + out to the socket connection with the catcher. + return 0 -- ok, -1 sent failed. +*/ +int send_buffer(int bytes_read) +{ + /* send the data */ + if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE, + 0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) { + perror("Sending packet"); + return FAIL; + } + return SUCCESS; +} + +/* + send_page takes a page from the current file and controls + sending it out the socket to the catcher. It calls send_buffer + to do the actuall call to sendto. +*/ +int send_page(int page) +{ + unsigned int bytes; + + if (verbose>=2) fprintf(stderr, "In send_page()\n"); + + if (file_changed) { + total_bytes -= ((page<nPages) ? PAGE_SIZE : last_bytes); + --total_pages; + return FAIL; + } + + bytes = (unsigned int) pack_page_for_file(page); + + if ((page < nPages && bytes != PAGE_SIZE) || + (nPages == page && bytes != last_bytes)) { + /* file under sync must have been changed during syncing */ + fprintf(stderr, "Warning: read() error, expected_bytes= %d, real= %d, " + "page = %d, nPages = %s%d, for file %s\n", + (page<nPages) ? PAGE_SIZE : last_bytes, + bytes, page, (current_entry()<0) ? "-" : "", nPages, getFullname()); + file_changed = TRUE; + /* make corrections in total_xxx that we send + otherwise the final statistic will be messed up */ + total_bytes -= ((page<nPages) ? PAGE_SIZE : last_bytes); + --total_pages; + return FAIL; + } + + /* fill in the header data */ + /* 20060323 -- add htonl so that the codes can work across + different machines with either little- or big-endian. + So, before we send out ints, we convert them to network-byte order*/ + *total_pages_ptr = htonl(nPages); + *bytes_sent_ptr = htonl(bytes); + *current_file_ptr = htonl(cur_entry); + *current_page_ptr = htonl(page); + + /* for final statistics */ + ++real_total_pages; + real_total_bytes += bytes; + + if (verbose>=3) + fprintf(stderr, "Sending page=%d of %d in file %d of %d\n", + page, nPages, + cur_entry, total_entries()); + return send_buffer(bytes); +} + +/* + send_test zeroes out the buffers going to the catcher + and thereby sends a test packet to the catcher. +*/ +void send_test() +{ + *mode_ptr = htonl(TEST); /* --> network byte order */ + + *current_file_ptr = 0; + *current_page_ptr = 0; + *total_pages_ptr = 0; + *bytes_sent_ptr = 0; + + send_buffer(0); + fprintf(stderr, "Test packet is sent.\n"); +} + +void send_cmd(int code, int machine_id) +{ + /* + Except for OPEN_FILE_CMD, + only the header area in the send_buff gets filled + with data. + */ + *mode_ptr = htonl(code); /* --> network byte order */ + *current_page_ptr = htonl(machine_id); /* -1 being all machines */ + *current_file_ptr = htonl(cur_entry); + *total_pages_ptr = htonl(nPages); + + *bytes_sent_ptr = (code == OPEN_FILE_CMD) ? htonl(fill_here - data_ptr) :0; + + /* do the header in send_buf */ + send_buffer((code == OPEN_FILE_CMD) ? fill_here - data_ptr : 0); + + /* print message */ + if (verbose>=2) { + fprintf(stderr, "cmd [%s] sent\n", cmd_name[code]); + } +} + +void pack_open_file_info() +{ + /* prepare udp send_buffer for file info + (header) (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0 + */ + clear_send_buf(); + fill_here += fill_in_stat(data_ptr); + fill_here += fill_in_filename(fill_here); + if (is_softlink() || is_hardlink()) { + fill_here += fill_in_linktar(fill_here); + } +} + +void send_all_done_cmd() +{ + int i; + *mode_ptr = htonl(ALL_DONE_CMD); /* --> network byte order */ + + *current_file_ptr = 0; + *current_page_ptr = 0; + *total_pages_ptr = 0; + *bytes_sent_ptr = 0; + + for(i=0; i<10; ++i) { /* do it many times, in case network is busy */ + send_buffer(0); + usleep(DT_PERPAGE*10); + } + /* NOTE: it is still possible that ALL_DONE msg could not + be received by targets. + For total robustness, some independent checking on targets + should be done. + */ + fprintf(stderr, "(ALL DONE)\n"); +} + +void my_exit(int good_or_bad) +{ + if (send_fd) send_all_done_cmd(); + exit(good_or_bad); +} diff --git a/set_catcher_mcast.c b/set_catcher_mcast.c new file mode 100644 index 0000000..9a04017 --- /dev/null +++ b/set_catcher_mcast.c @@ -0,0 +1,142 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This file collects functions related to setting multicast + for multicatcher. They are IPv4 and IPv6 ready. + By default, we use IPv4. + To use IPv6, we need to specify -DIPv6 in Makefile. + The functions in this file are collected from + Richard Stevens' Networking bible: Unix Network programming + + I added Mcast_join(). + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include <net/if.h> +#ifdef _SUN +#include <sys/sockio.h> /* define SIOCGIFADDR */ +#else +#include <linux/sockios.h> +#endif + +#define SA struct sockaddr + +int mcast_join(int sockfd, const SA *sa, socklen_t salen, + const char *ifname, u_int ifindex) +{ + switch (sa->sa_family) { + case AF_INET: { + struct ip_mreq mreq; + struct ifreq ifreq; + + memcpy(&mreq.imr_multiaddr, + &((struct sockaddr_in *) sa)->sin_addr, + sizeof(struct in_addr)); + + if (ifindex > 0) { + if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) { + errno = ENXIO; /* i/f index not found */ + return(-1); + } + goto doioctl; + } else if (ifname != NULL) { + strncpy(ifreq.ifr_name, ifname, IFNAMSIZ); +doioctl: + if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0) + return(-1); + memcpy(&mreq.imr_interface, + &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr, + sizeof(struct in_addr)); + } else + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + + return(setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreq, sizeof(mreq))); + } + +#ifdef IPV6 + case AF_INET6: { + struct ipv6_mreq mreq6; + + memcpy(&mreq6.ipv6mr_multiaddr, + &((struct sockaddr_in6 *) sa)->sin6_addr, + sizeof(struct in6_addr)); + + if (ifindex > 0) + mreq6.ipv6mr_interface = ifindex; + else if (ifname != NULL) + if ( (mreq6.ipv6mr_interface = if_nametoindex(ifname)) == 0) { + errno = ENXIO; /* i/f name not found */ + return(-1); + } + else + mreq6.ipv6mr_interface = 0; + + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, + &mreq6, sizeof(mreq6))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + +int Mcast_join(int sockfd, const char *mcast_addr, + const char *ifname, u_int ifindex) +{ + #ifndef IPV6 + /* IPv4 */ + struct sockaddr_in sa; + sa.sin_family = AF_INET; + inet_pton(AF_INET, mcast_addr, &sa.sin_addr); + #else + struct sockaddr_in6 sa; + sa.sin6_family = AF_INET6; + inet_pton(AF_INET6, mcast_addr, &sa.sin6_addr); + #endif + + return (mcast_join(sockfd, (struct sockaddr *) &sa, sizeof(sa), + ifname, ifindex)); +} + +void sock_set_addr(struct sockaddr *sa, socklen_t salen, const void *addr) +{ + switch (sa->sa_family) { + case AF_INET: { + struct sockaddr_in *sin = (struct sockaddr_in *) sa; + + memcpy(&sin->sin_addr, addr, sizeof(struct in_addr)); + return; + } + + #ifdef IPV6 + case AF_INET6: { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) sa; + + memcpy(&sin6->sin6_addr, addr, sizeof(struct in6_addr)); + return; + } + #endif + } + + return; +} + diff --git a/set_mcast.c b/set_mcast.c new file mode 100644 index 0000000..aac3d36 --- /dev/null +++ b/set_mcast.c @@ -0,0 +1,160 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This file collects functions related to setting multicast + for multicaster. They are IPv4 and IPv6 ready. + By default, we use IPv4. + To use IPv6, we need to specify -DIPv6 in Makefile. + The functions in this file are collected from + Richard Stevens' Networking bible: Unix Network programming + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "main.h" +#include <net/if.h> +#ifdef _SUN +#include <sys/sockio.h> /* define SIOCGIFADDR */ +#else +#include <linux/sockios.h> +#endif + +#define SA struct sockaddr +#define MAXSOCKADDR 128 /* max socket address structure size */ + +int sockfd_to_family(int sockfd) +{ + union { + struct sockaddr sa; + char data[MAXSOCKADDR]; + } un; + socklen_t len; + + len = MAXSOCKADDR; + if (getsockname(sockfd, (SA *) un.data, &len) < 0) + return(-1); + return(un.sa.sa_family); +} + +int mcast_set_if(int sockfd, const char *ifname, u_int ifindex) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + struct in_addr inaddr; + struct ifreq ifreq; + + if (ifindex > 0) { + if (if_indextoname(ifindex, ifreq.ifr_name) == NULL) { + errno = ENXIO; /* i/f index not found */ + return(-1); + } + goto doioctl; + } else if (ifname != NULL) { + strncpy(ifreq.ifr_name, ifname, IFNAMSIZ); +doioctl: + if (ioctl(sockfd, SIOCGIFADDR, &ifreq) < 0) + return(-1); + memcpy(&inaddr, + &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr, + sizeof(struct in_addr)); + } else + inaddr.s_addr = htonl(INADDR_ANY); /* remove prev. set default */ + + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF, + &inaddr, sizeof(struct in_addr))); + } + +#ifdef IPV6 + case AF_INET6: { + u_int index; + + if ( (index = ifindex) == 0) { + if (ifname == NULL) { + errno = EINVAL; /* must supply either index or name */ + return(-1); + } + if ( (index = if_nametoindex(ifname)) == 0) { + errno = ENXIO; /* i/f name not found */ + return(-1); + } + } + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_IF, + &index, sizeof(index))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + +int mcast_set_loop(int sockfd, int onoff) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + u_char flag; + + flag = onoff; + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, + &flag, sizeof(flag))); + } + +#ifdef IPV6 + case AF_INET6: { + u_int flag; + + flag = onoff; + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, + &flag, sizeof(flag))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} +/* end mcast_set_loop */ + +int mcast_set_ttl(int sockfd, int val) +{ + switch (sockfd_to_family(sockfd)) { + case AF_INET: { + u_char ttl; + + ttl = val; + return(setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, + &ttl, sizeof(ttl))); + } + +#ifdef IPV6 + case AF_INET6: { + int hop; + + hop = val; + return(setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, + &hop, sizeof(hop))); + } +#endif + + default: + errno = EPROTONOSUPPORT; + return(-1); + } +} + diff --git a/setup_socket.c b/setup_socket.c new file mode 100644 index 0000000..3625a2a --- /dev/null +++ b/setup_socket.c @@ -0,0 +1,242 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + make it IPv6-ready + Copyright (C) 2005 Renaissance Technologies Corp. + file name is changed from main.c to setup_socket.c + Copyright (C) 2001 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + This file was modified in 2001 and later from files in the program + multicaster copyrighted by Aaron Hillegass as found at + <http://sourceforge.net/projects/multicaster/> + + Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ +/* + This part was based on + (1) codes by Aaron Hillegass <aaron@classmax.com> + (2) codes in Steven's book 'network programming' + + 200605 change it to make it IPv6 ready + +*/ + +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <stdio.h> +#include <stdlib.h> +#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */ +#include <arpa/inet.h> /* inet(3) functions */ +#include <errno.h> + +extern int verbose; +int delay_sec; +int delay_usec; + + +/* Set up for catcher's complaint socket */ +#ifndef IPV6 +int complaint_socket(struct sockaddr_in *addr, int port) +#else +int complaint_socket(struct sockaddr_in6 *addr, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + if (verbose>=2) fprintf(stderr, "in send_socket_ip\n"); + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + /*addr->sin_addr.s_addr = ip; this fx in for init process only + This ip will be overwritten later after + 1st packet is received. */ + #else + /* IPv6 */ + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + /* addr->sin_addr.s_addr = ip; see comments above */ + #endif + + if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Send socket"); + exit(1); + } + return fd; +} + +/* Set up mcast send socket for multicaster based on (char*)cp */ +#ifndef IPV6 +int send_socket(struct sockaddr_in *addr, char * cp, int port) +#else +int send_socket(struct sockaddr_in6 *addr, char * cp, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + char buf[50]; + if (verbose>=2) fprintf(stderr, "in send_socket\n"); + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + /*addr->sin_addr.s_addr = inet_addr(cp);*/ + inet_pton(AF_INET, cp, &addr->sin_addr); + /* Print out IP address and port */ + inet_ntop(AF_INET, &addr->sin_addr, buf, 50); + #else + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + /*addr->sin_addr.s_addr = inet_addr(cp);*/ + inet_pton(AF_INET6, cp, &addr->sin6_addr); + /* Print out IP address and port */ + inet_ntop(AF_INET6, &addr->sin6_addr, buf, 50); + #endif + + if (verbose>=2) + fprintf(stderr, "Creating a send socket to %s:%d\n", buf, port); + + if ((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Send socket"); + exit(1); + } + + if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){ + perror("in send_socket(): bind error (need to change MCAST_ADDR)"); + exit(1); + } + return fd; /*send_socket_ip(addr, address, port);*/ +} + +/* set up socket on the receiving end */ +#ifndef IPV6 +int rec_socket(struct sockaddr_in *addr, int port) +#else +int rec_socket(struct sockaddr_in6 *addr, int port) +#endif +{ + int fd, sockaddr_len; + sa_family_t family; + + #ifndef IPV6 + /* IPv4 */ + sockaddr_len = sizeof(struct sockaddr_in); + memset(addr, 0, sockaddr_len); + addr->sin_family = AF_INET; + family = AF_INET; + addr->sin_port = htons(port); + addr->sin_addr.s_addr = htonl(INADDR_ANY); + #else + sockaddr_len = sizeof(struct sockaddr_in6); + memset(addr, 0, sockaddr_len); + addr->sin6_family = AF_INET6; + family = AF_INET6; + addr->sin6_port = htons(port); + addr->sin6_addr = in6addr_any; /* RS book: page 92 */ + #endif + + if((fd = socket(family, SOCK_DGRAM, 0)) < 0){ + perror("Socket create"); + exit(1); + } + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, NULL, 0); + + if (verbose>=2) + fprintf(stderr, "Creating receive socket on port %d\n", port); + + /*if ((bind(fd, addr, sizeof(*addr))) < 0){ MOD by RWM: replaced by next line */ + if ((bind(fd, (const struct sockaddr *)addr, sockaddr_len)) < 0){ + perror("in rec_socket(): bind error (need to change PORT)"); + exit(1); + } + return fd; +} + +/* + set the values of two variables to be used by select() + in readable(). +*/ +void set_delay(int secs, int usecs) +{ + if (verbose>=2) { + if (usecs == -1) + fprintf(stderr, "Timeout: set to infinite\n"); + else + fprintf(stderr, "Timeout: set to %d sec + %d usec\n", secs, usecs); + } + delay_sec = secs; + delay_usec = usecs; +} + +void get_delay(int * secs, int * usecs) +{ + *secs = delay_sec; + *usecs= delay_usec; +} + +/* + Check if there is an incoming UDP for a certain amount + of time period specified in delay_tv.. + + Return 'true' if there is. + Return 'false' if no valid UDP has arrived within the time period. + + In multicaster, this is used in read_handle_page() right after + send_page() is carried out. As a result, read_handle_complaint() waits + for any incoming complaints from any target machines within + the specified time period. As it is now, no target machine + will send back complaints during the transmission of all the + pages for a file. Therefore, read_handle_complaint() serves effectively + as a time delay between sending of each page. +*/ +int readable(int fd) +{ + struct timeval delay_tv; + fd_set rset; + FD_ZERO(&rset); + FD_SET(fd, &rset); + + /* + if microsec == -1 wait forever for a packet. + This is used in the beginning when multicatcher is just + invoked. + */ + if (delay_usec == -1){ + return(select(fd + 1, &rset, NULL, NULL, NULL)); + } else { + delay_tv.tv_sec = delay_sec; + delay_tv.tv_usec = delay_usec; + return (select(fd + 1, &rset, NULL, NULL, &delay_tv)); + } +} + diff --git a/signal.c b/signal.c new file mode 100644 index 0000000..5b49b52 --- /dev/null +++ b/signal.c @@ -0,0 +1,93 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + The code in this file is copied from + Richard Stevens' book + "UNIX Network Programming" Chap.22.3 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include "signal.h" + +Sigfunc * signal(int signo, Sigfunc *func) +{ + struct sigaction act, oact; + + act.sa_handler = func; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + if (signo == SIGALRM) { +#ifdef SA_INTERRUPT + act.sa_flags |= SA_INTERRUPT; /* SunOS 4.x */ +#endif + } else { +#ifdef SA_RESTART + act.sa_flags |= SA_RESTART; /* SVR4, 44BSD */ +#endif + } + if (sigaction(signo, &act, &oact) < 0) + return(SIG_ERR); + return(oact.sa_handler); +} +/* end signal */ + +Sigfunc * Signal(int signo, Sigfunc *func) /* for our signal() function */ +{ + Sigfunc *sigfunc; + + if ( (sigfunc = signal(signo, func)) == SIG_ERR) + perror("signal error"); + return(sigfunc); +} + +int Fcntl(int fd, int cmd, int arg) +{ + int n; + + if ( (n = fcntl(fd, cmd, arg)) == -1) + perror("fcntl error"); + return(n); +} + +int Ioctl(int fd, int request, void *arg) +{ + int n; + + if ( (n = ioctl(fd, request, arg)) == -1) + perror("ioctl error"); + return(n); /* streamio of I_LIST returns value */ +} + +void Sigemptyset(sigset_t *set) +{ + if (sigemptyset(set) == -1) + perror("sigemptyset error"); +} + +void Sigaddset(sigset_t *set, int signo) +{ + if (sigaddset(set, signo) == -1) + perror("sigaddset error"); +} + +void Sigprocmask(int how, const sigset_t *set, sigset_t *oset) +{ + if (sigprocmask(how, set, oset) == -1) + perror("sigprocmask error"); +} diff --git a/signal.h b/signal.h new file mode 100644 index 0000000..bf309f8 --- /dev/null +++ b/signal.h @@ -0,0 +1,32 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include <sys/types.h> +#include <stdio.h> +#include <signal.h> +#include <unistd.h> +/******* on linux: ioctl() is defined in sys/ioctl.h instead of unistd.h as on SunOS *****/ +#include <sys/ioctl.h> +#include <fcntl.h> + +typedef void Sigfunc(int); /* for signal handlers */ + diff --git a/timing.c b/timing.c new file mode 100644 index 0000000..d4baa15 --- /dev/null +++ b/timing.c @@ -0,0 +1,109 @@ +/* + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2005 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#include <sys/time.h> +#include <stdio.h> + +#define N 501 /* effectively set the maximum time in rtt_hist to be 500 msec */ + +extern int verbose; + +/* for timing */ +struct timeval tv0, tv1; +unsigned long usec_acc, sec_acc; /* accumulator of timing */ +unsigned int rtt_hist[N]; /* rtt_hist[i] = count of rtt within (i, i+1) */ + +void refresh_timer() +{ + usec_acc = 0; + sec_acc = 0; +} + +void start_timer() +{ + struct timezone tz; + gettimeofday(&tv0, &tz); +} + +void end_timer() +{ + struct timezone tz; + gettimeofday(&tv1, &tz); /* end timer -------- */ +} + +void update_time_accumulator() +{ + if (tv1.tv_usec<tv0.tv_usec) { + sec_acc += (tv1.tv_sec - tv0.tv_sec - 1); + usec_acc += (1000000 + tv1.tv_usec - tv0.tv_usec); + } else { + sec_acc += (tv1.tv_sec - tv0.tv_sec); + usec_acc += (tv1.tv_usec - tv0.tv_usec); + } +} + +double get_accumulated_time() +{ + double sec = sec_acc; + sec += (usec_acc / 1e6); + return sec; +} + +double get_accumulated_usec() +{ + double usec = usec_acc; + usec += (sec_acc*1e6); + return usec; +} + +void init_rtt_hist() +{ + int i; + for(i=0; i<N; ++i) rtt_hist[i] = 0; +} + +void update_rtt_hist(unsigned int rtt) +{ + unsigned int index; + index = rtt / 1000; + if (index>(N-2)) index = N-1; + rtt_hist[index]++; +} + +void pr_rtt_hist() +{ + int i; + fprintf(stderr, "rtt histogram\n"); + fprintf(stderr, "msec counts\n"); + fprintf(stderr, "---- --------\n"); + for(i=0; i<N; ++i) { + if (verbose<=1 && i>10) continue; + if (rtt_hist[i] != 0) { + fprintf(stderr, "%4d %u\n", i, rtt_hist[i]); + } + } +} + +unsigned int pages_wo_ack() +{ + return rtt_hist[N-1]; +} diff --git a/trFilelist.c b/trFilelist.c new file mode 100644 index 0000000..8f65796 --- /dev/null +++ b/trFilelist.c @@ -0,0 +1,449 @@ +/* + Copyright (C) 2008 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + Copyright (C) 2006 Renaissance Technologies Corp. + main developer: HP Wei <hp@rentec.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. + If not, write to the Free Software Foundation, + 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +/***this code parse the synclist file generated by rsync in dry-run mode. + The minimum options for rsync is : -avW --dry-run --delete + e.g. + /usr/local/bin/rsync --rsync-path=/usr/local/bin/rsync + -avW --dry-run --delete + /src/path/ dest_machine:/target/path/ > output 2>&1 +A typical output of rsync may look like this: +Client is very old version of rsync, upgrade recommended. +building file list ... done +xyz -> ./sub1/xyz +file1 +fn +fn1 +j +sub1/hardlink_to_file1 +sub1/path/testfile +sub1/xyz +sent 337 bytes read 44 bytes 254.00 bytes/sec +total size is 320751046 speedup is 841866.26 + +This code does the following three things. +(1) skip the lines before 'done' and after 'wrote' +(2) output all directories and file_path + e.g + for an entry: sub1/path/testfile, the output is + sub1 + sub1/path + sub1/path/testfile +(3) xyz -> ./sub1/xyz + the output is + xyz +(4) If file1 and sub1/hardlink are hardlinked + the output is + file1 + sub1/hardlink file1 + +For the example output above, the output of this code is: + +xyz +file1 +fn +fn1 +j +sub1 +sub1/hardlink_to_file1 file1 +sub1/path +sub1/path/testfile +sub1/xyz + +***/ + +#include <stdio.h> +#include <stdlib.h> +#include <limits.h> +#include <string.h> +#include <limits.h> /* to define PATH_MAX */ +#include <sys/stat.h> + +#define TRUE 1 +#define FALSE 0 + +struct string_list { + int capacity; + char ** endp; + char ** str; +}; + +void init_string_list(struct string_list * str_ptr, int n) +{ + str_ptr->str = malloc(n * sizeof(void*)); + str_ptr->capacity = n; + str_ptr->endp = str_ptr->str; +} + +void grow_string_list(struct string_list * slp) +{ + int new_capacity = 2 * slp->capacity; + char ** old_ptrs = slp->str; + char ** new_ptrs; + char ** newp = malloc(new_capacity * sizeof(void *)); + new_ptrs = newp; + + while (old_ptrs < slp->endp) { + *new_ptrs++ = *old_ptrs++; + } + + free(slp->str); + slp->str = newp; + slp->endp= new_ptrs; + slp->capacity = new_capacity; +} + +void append_string_list(char * str, struct string_list * slp) +{ + if (slp->endp - slp->str == slp->capacity) grow_string_list(slp); + *slp->endp = strdup(str); + (slp->endp)++; +} + +/*************** change to return index ****/ +int find_string(char * str, struct string_list * slp) +{ + /* find if str is in the list */ + int i; + int n = slp->endp - slp->str; + + for(i=0; i<n; ++i) { + if (strcmp((slp->str)[i], str)==0) return i; + } + return -1; +} + +/* find if a string in string-list is a sub-string of str */ +int has_sub_string(char * str, struct string_list *slp) +{ + int i; + int n = slp->endp - slp->str; + + for(i=0; i<n; ++i) { + if (strncmp(str, (slp->str)[i], strlen((slp->str)[i]))==0) return i; + } + return -1; +} + +/* find if the str is a substr of those in slp */ +int has_newdir(char *str, struct string_list *slp) +{ + int i; + int n = slp->endp - slp->str; + + for(i=0; i<n; ++i) { + if (strncmp((slp->str)[i],str, strlen(str))==0) return i; + } + return -1; +} + +struct uint_list { + int capacity; + unsigned int * endp; + unsigned int * d; +}; + +void init_uint_list(struct uint_list * uil_ptr, int n) +{ + uil_ptr->d = malloc(n * sizeof(unsigned int)); + uil_ptr->capacity = n; + uil_ptr->endp = uil_ptr->d; +} + +void grow_uint_list(struct uint_list * uilp) +{ + int new_capacity = 2 * uilp->capacity; + unsigned int * old_ptrs = uilp->d; + unsigned int * new_ptrs; + unsigned int * newp = malloc(new_capacity * sizeof(unsigned int)); + new_ptrs = newp; + + while (old_ptrs < uilp->endp) { + *new_ptrs++ = *old_ptrs++; + } + + free(uilp->d); + uilp->d = newp; + uilp->endp= new_ptrs; + uilp->capacity = new_capacity; +} + +void append_uint_list(unsigned int data, struct uint_list * uilp) +{ + if (uilp->endp - uilp->d == uilp->capacity) grow_uint_list(uilp); + *uilp->endp = data; + (uilp->endp)++; +} +/*************** change to return index ****/ +int find_unit(unsigned int data, struct uint_list * uilp) +{ + /* find if data is in the list */ + int i; + int n = uilp->endp - uilp->d; + + for(i=0; i<n; ++i) { + if ((uilp->d)[i] == data) return i; + } + return -1; +} + +struct string_list file_list; +struct uint_list ino_list; +struct string_list dir_list; +struct string_list softlink_list; /* for (a) */ +struct string_list newdir_list; /* for (b) */ + +void strip(char * str) +{ + /* remove trailing \n and spaces */ + char *pt; + char *pc = &str[strlen(str)-1]; + while (*pc == ' ' || *pc == '\n') *(pc--) = '\0'; + /* 20080317 remove leading spaces */ + pt = pc = &str[0]; + while (*pc == ' ') ++pc; + if (pc != pt) { + while (*pc != '\0') *pt++ = *pc++; + *pt = '\0'; + } +} + +void output_subs(char * str) +{ + return; /*************************** testing ***************/ + /* to do (2) indicated in the above */ + /******** + char * pc; + char subs[PATH_MAX]; + pc = strstr(str, "/"); + if (!pc) return; + + while (pc) { + strncpy(subs, str, pc-str); + subs[pc-str] = '\0'; + if (find_string(subs, &dir_list)<0) { + printf("%s\n", subs); + append_string_list(subs, &dir_list); + } + pc = strstr(pc+1, "/"); + } + ************/ +} + +/*** (a) + get those softlinks that points to a directory + this is to deal with the following scenario + previous structure + dir_path (a directory) + db (a directory) + + newly updated structure on master + dir_path -> db + db + + rsync --dry-run generates + dir_path -> db [a link is done on target] + deleting dir_path/sub/filename1 [wrong file gets removed ] + deleting dir_path/sub/filename2... + + file_operations.c does this when dir_path -> db is due + delete dir_path (rm -rf) + make the softlink + But then the following delete will have undesired deletion. + + ------------------------------------------------------------ + + (b) + t0 name -> xyz name -> xyz (target) + t1 name/ name -> xyz + + rsync generates + name/ update_directory() won't have effect + name/f1 delivered to wrong place + name/f2 + deleting name too late + ** the deletion should be done before not after. + For now, I will fail this code for this situation. + +***/ +void get_dir_softlinks(char *filename, char * basedir) { + FILE * fd; + char line[PATH_MAX]; + struct stat st; + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + exit(-1); + } + + while (1) { /* for each line in the file */ + char *pc; + char fn[PATH_MAX]; + + if (fgets(line, PATH_MAX, fd)==NULL) break; + strip(line); + if (strlen(line) == 0) continue; /* skip blank line */ + + /* the softlink case is indicated by -> */ + pc= strstr(line, " -> "); + if (pc) { /* it is a softlink */ + *pc = '\0'; + /* check if it is a directory */ + sprintf(fn, "%s/%s", basedir, line); + + /* check if the link-target is a directory */ + if (stat(fn, &st)<0) continue; /* We skip this bad entry - no longer exist */ + + if (S_ISDIR(st.st_mode)) { + append_string_list(line, &softlink_list); + } + } else { /* not a softlink --> find if it is a directory */ + /* find a line without ' ' and with trailing '/' */ + pc = strstr(line, " "); /* the first space */ + if (!pc) { + char * plast = &line[0] + strlen(line) - 1; + if (*plast == '/') { + append_string_list(line, &newdir_list); + } + } + } + } + + fclose(fd); +} + + +int main(int argc, char * argv[]) +{ + char * filename; + char * basedir; + FILE *fd; + char line[PATH_MAX]; + + if (argc < 3) { + fprintf(stderr, "Usage: trFilelist synclist_filename basedir\n"); + exit(-1); + } + + filename = argv[1]; + basedir = argv[2]; + + init_string_list(&file_list, 10); + init_uint_list(&ino_list, 10); + init_string_list(&dir_list, 100); + init_string_list(&softlink_list, 10); + init_string_list(&newdir_list, 100); + + get_dir_softlinks(filename, basedir); + + if ((fd = fopen(filename, "r")) == NULL) { + fprintf(stderr, "Cannot open file -- %s \n", filename); + return -1; + } + + while (1) { /* for each line in the file */ + char *pc; + char fn[PATH_MAX]; + struct stat st; + int newdir_flag; + + if (fgets(line, PATH_MAX, fd)==NULL) break; + strip(line); + if (strlen(line) == 0) continue; /* skip blank line */ + if (strcmp(line, ".")==0) continue; + if (strcmp(line, "./")==0) continue; + + /* first we look for deleting entry */ + if (strncmp(line, "deleting ", 9)==0) { + /* deleting (directory) file_path */ + char * p1, *p2, *pf; + + p1 = strstr(line, " "); /* the first space */ + p2 = strstr(p1+1, " "); /* deleting directory filepath * 20070912 this is old */ + pf = (p2) ? p2+1 : p1+1;/* it's always p1+1 */ + + newdir_flag = has_newdir(pf, &newdir_list); + + if ((has_sub_string(pf, &softlink_list)<0) && newdir_flag<0) { + /* see comments above get_dir_softlinks() */ + printf("deleting %s\n", pf); + } else if (newdir_flag>=0) { /* temporary action */ + /*** we can simply skip this block later. 20070912 ***/ + /***/ + fprintf(stderr, "CRITICAL ERROR: An old softlink has been changed to a directory!\n"); + fprintf(stderr, " For now, we crash this code for human intervention\n"); + fprintf(stderr, " line= %s\n", line); + exit(-1); + /***/ + } + + continue; + } + + /* the softlink case is indicated by -> */ + pc= strstr(line, " -> "); + if (pc) { + *pc = '\0'; + output_subs(line); + printf("%s\n", line); + continue; + } + + /* if rsync's -H is turned on, the output may contain + file => tar_hardlink_file (relative address) + */ + pc= strstr(line, " => "); + if (pc) { + *pc = '\0'; + output_subs(line); + printf("%s %s\n", line, pc+4); + continue; + } + + /* the rest of the entries should be valid paths */ + sprintf(fn, "%s/%s", basedir, line); + if (lstat(fn, &st)<0) continue; /* We skip this bad entry - + (1) the header and tail lines + (2) perhaps the file no longer exists */ + + /* is this a hardlink? */ + if (st.st_nlink > 1) { + int index; + output_subs(line); + if ((index = find_unit((unsigned int)st.st_ino, &ino_list))<0) { + append_uint_list((unsigned int)st.st_ino, &ino_list); + append_string_list(line, &file_list); /* relative path */ + printf("%s\n", line); + } else { + printf("%s %s\n", line, file_list.str[index]); + } + continue; + } + + /* all others */ + output_subs(line); + printf("%s\n", line); + } /* end of one line */ + + fclose(fd); + return 0; +} |
