aboutsummaryrefslogtreecommitdiffstats
path: root/mrsync.py
diff options
context:
space:
mode:
authorGuillaume Horel <guillaume.horel@serenitascapital.com>2015-11-04 12:30:44 -0500
committerGuillaume Horel <guillaume.horel@serenitascapital.com>2015-11-04 12:30:44 -0500
commita5309fed914fdaa7697f2d369e7dcd02309063ab (patch)
tree975bb588c4d9072ae1158ab670bf9fa851abd6f4 /mrsync.py
downloadmrsync-a5309fed914fdaa7697f2d369e7dcd02309063ab.tar.gz
initial import
Diffstat (limited to 'mrsync.py')
-rwxr-xr-xmrsync.py284
1 files changed, 284 insertions, 0 deletions
diff --git a/mrsync.py b/mrsync.py
new file mode 100755
index 0000000..bd8c0c7
--- /dev/null
+++ b/mrsync.py
@@ -0,0 +1,284 @@
+#!/usr/bin/env python
+
+import os,sys,string,time,getopt;
+
+my_module_path = os.path.dirname(sys.argv[0]);
+sys.path.append(my_module_path);
+from mrsync_config import *
+
+host = os.uname()[1];
+
+def bin(path, bin_name):
+ "return the full-path of a binary code"
+ return '%s/%s' % (path, bin_name);
+
+def isPathThere(path):
+ "check the existence of path on this master machine"
+ if not os.path.exists(path):
+ print >>sys.stderr, 'Path (%s) could not be found on %s' % (path, host);
+ sys.exit(-1);
+
+map(isPathThere, [bin(binDir, x) for x in codes]);
+
+def printTimeMsg(msg):
+ print >>sys.stderr, 'Time = %s %s' % (time.ctime(time.time()), msg);
+
+def prMulticastLog(msg):
+ "append msg to the log file"
+ open(multicast_log, 'a').write('%s %s\n' % (time.ctime(time.time()), msg));
+
+(catcher_err_log, goodTargetsFile, syncFileList) = \
+ map(lambda x:
+ '%s.%s' % (x, ('%02d'*5) % time.localtime()[1:6]),
+ (catcher_err_log, goodTargetsFile, syncFileList));
+
+def usage():
+ print """mrsync.py (to sync files from one to many machines by multicast)
+ Option list:
+ [ -v <verbose_level 0-2 (2 is for debug)> ]
+ [ -w <ack_wait_times default= 60 (secs) ]
+ [ -r <remote shell to invoke multicatcher, default=rsh> ]
+ [ -b <remote_bin_dir_path for multicatcher, default=%s> ]
+ [ -o <more rsync options such as --include --exclude,
+ to be appended to default min_opts = %s>]
+ [ -x flag to turn off monitor mechanism (not fully tested and not recommended) ]
+ ----- Essential options --------------------------------------------------------
+ -m <machine_list_file_path or csv_list (name1,name2...)>
+ -s <source_data_path>
+ [ -t <target_data_path; default = that in -s option> ]
+ [ -l <list of files(wildcards) to be synced, can be a filepath or csv_list>
+ mrsync by default uses rsync to find the list unless this option is given. ]
+ ----- mcast options ------------------------------------------------------------
+ [ -A <my_MCAST_ADDR such as 239.255.67.92> ]
+ [ -P <my_PORT such as 9000> ]
+ [ -T <my_TTL default=1> ]
+ [ -L flag to turn on mcast_LOOP. default is off ]
+ [ -I <my_MCAST_IF default=NULL> ]
+ """ % (rBinDir, min_rsync_opt%reshell);
+
+# --- handle command line
+opts, args = getopt.getopt(sys.argv[1:], 'hxv:w:r:b:o:m:s:t:l:A:P:T:LI:', []);
+
+if len(opts)==0 or len(args)>0:
+ usage();
+ sys.exit(-1);
+
+verbose = 0;
+ack_wait_times = -1;
+without_monitor = False;
+machineListFile = '';
+sourcePath = '';
+targetPath = '';
+synclist = '';
+rsync_opts = min_rsync_opt % reshell;
+
+mcast_addr = '';
+port = -1;
+ttl = 1;
+loop = False;
+mcast_if = '';
+
+if not len(opts) == 0:
+ for o,v in opts:
+ if o=='-v':
+ verbose = string.atoi(v);
+ elif o=='-w':
+ ack_wait_times = string.atoi(v);
+ elif o=='-h':
+ usage();
+ sys.exit(0);
+ elif o=='-r':
+ reshell = v;
+ elif o=='-b':
+ rBinDir = v;
+ elif o=='-o':
+ rsync_opts += (' %s' % v)
+ elif o=='-m':
+ machineListFile = v;
+ elif o=='-s':
+ sourcePath = v;
+ elif o=='-t':
+ targetPath = v;
+ elif o=='-l':
+ synclist = v;
+ elif o=='-A':
+ mcast_addr = v;
+ elif o=='-P':
+ port = string.atoi(v);
+ elif o=='-T':
+ ttl = string.atoi(v);
+ elif o=='-L':
+ loop = True;
+ elif o=='-I':
+ mcast_if = v;
+ elif o=='-x':
+ without_monitor = True;
+
+if verbose>=1: print 'mrsync version 4.0.0';
+
+if not machineListFile or not sourcePath:
+ print >>sys.stderr, 'Essential options (-m -s) should be specified.';
+ usage();
+ sys.exit(-1);
+
+isPathThere(sourcePath);
+if not targetPath: targetPath = sourcePath;
+
+# ------------ get machine list
+machines = (os.path.exists(machineListFile) and
+ [x[:-1] for x in open(machineListFile).readlines()] or
+ machineListFile.split(','));
+
+# clean up the names
+machines = filter(lambda x: x!='', machines);
+machines = [x.strip() for x in machines];
+
+if host in machines:
+ if verbose>=1: print 'remove myself (%s) from machine list...' % host;
+ machines.remove(host);
+
+# ------------ get the syncList from the first good machine
+# the list is stored in syncFileList for multicaster.
+import cmdToTarget;
+
+def cleanup(file):
+ "remove a file if it exists"
+ if os.path.exists(file): os.unlink(file);
+
+def get_synclist_from_cmdline(tmp):
+ "extracts synclist from cmdline option, outputs results into tmp_file"
+ cleanup(tmp);
+ if verbose>=1: print >>sys.stderr, 'extracting %s...' % synclist;
+
+ list = (os.path.exists(synclist) and
+ [x[:-1] for x in open(synclist).readlines()] or
+ synclist.split(','));
+
+ if len(list)==0:
+ print >>sys.stderr, "Empty sync_list in cmd_line option %s" % synclist;
+ sys.exit(-1);
+
+ def pr_relative_path(fullpath):
+ open(tmp, 'a').write('%s\n' % fullpath.replace(sourcePath+'/', ''));
+
+ for item in list:
+ " each item can be a pattern for files"
+ import glob
+ map(pr_relative_path, glob.glob('%s/%s' % (sourcePath, item)));
+
+
+def get_synclist_from_rsync(tmp):
+ "use rsync to get a list of to-be-synced files. results are put in tmp file"
+ cleanup(tmp);
+ for machine in machines:
+ # check if this machine is ok to rsh (ssh)
+ if (not cmdToTarget.isMachineOK(reshell, machine)): # if not go to next machine
+ continue;
+
+ if verbose>=1:
+ print >>sys.stderr, 'Get to-be-synced files from %s...' % machine;
+
+ cmd = ' '.join(filter(lambda x: x,
+ [rsyncPath, '--rsync-path='+remote_rsyncPath, \
+ (reshell != 'rsh' and \
+ rsync_opts.replace('--rsh=rsh', '--rsh=%s' % reshell) \
+ or rsync_opts),
+ sourcePath+'/',
+ '%s:%s/' % (machine, targetPath),
+ '> %s 2>&1; ' % tmp]));
+
+ if verbose>=1: print >>sys.stderr, cmd;
+ os.system(cmd);
+ break;
+
+def tmp_synclist():
+ "intermediate synclist filename"
+ return '/tmp/%s' % os.path.basename(syncFileList);
+
+(synclist and get_synclist_from_cmdline or get_synclist_from_rsync)(tmp_synclist());
+
+def ckFileSize(file):
+ if os.path.getsize(file)==0:
+ print >>sys.stderr, "Empty file = %s" % file;
+ sys.exit(-1);
+
+ckFileSize(tmp_synclist());
+
+# translate the files generated by rsync or command-line option into
+# a format which can be recognized by multicaster.
+cmd = ' '.join([bin(binDir, translate),
+ tmp_synclist(), sourcePath, '>', syncFileList]);
+if verbose>=1: print >>sys.stderr, cmd;
+os.system(cmd);
+
+ckFileSize(syncFileList);
+
+# ------------ invoke multicatcher on all target machines
+def gen_catcher_cmd(count):
+ "return mulitcatcher_command to be invoked on target machines"
+ return ' '.join(filter(lambda x: x,
+ [bin(rBinDir, catcher),
+ '-t', targetPath,
+ '-i', '%d'%count, # machine id
+ (mcast_addr and '-A '+mcast_addr or ''),
+ (port>0 and '-P %d'%port or ''),
+ (mcast_if and '-I %s'%mcast_if or ''),
+ '</dev/null 1>/dev/null 2>%s &' % catcher_err_log])); ### workaround ssh problem
+
+def invoke_catcher(ms, count, bads):
+ "invoke catcher for each machine in ms, return bad_machines in bads"
+ if not ms: return bads;
+
+ machine = ms.pop(0);
+
+ if (not cmdToTarget.isMachineOK(reshell, machine)):
+ if verbose>=1: print >>sys.stderr, "***%s is down" % machine;
+ bads.append(machine);
+ return invoke_catcher(ms, count, bads);
+
+ cmd = gen_catcher_cmd(count);
+ if count==0 and verbose>=1: print >>sys.stderr,cmd;
+ status, output = cmdToTarget.docmd(reshell, machine, cmd);
+
+ if (not status):
+ if verbose>=1: print >>sys.stderr, "***remote shell exec failed for %s" % machine;
+ bads.append(machine);
+ return invoke_catcher(ms, count, bads);
+
+ if verbose>=1: print >>sys.stderr, 'id:%3d %s' % (count, machine);
+ open(goodTargetsFile, 'a').write('%s\n' % machine)
+ return invoke_catcher(ms, count+1, bads);
+
+printTimeMsg("Invoking multicatcher on all %d machines..." % len(machines));
+cleanup(goodTargetsFile);
+badMachines = invoke_catcher(machines, 0, []);
+
+# -------------- invoke multicast on the master machine
+printTimeMsg('Starting multicasting...');
+prMulticastLog('start multicast on %s' % host);
+
+def gen_caster_cmd():
+ "return mulitcaster_command to be invoked on this host (master machine)"
+ return ' '.join(filter(lambda x: x,
+ [bin(binDir, caster),
+ '-v %d' % verbose,
+ '-m %s' % goodTargetsFile,
+ '-s %s' % sourcePath,
+ '-f %s' % syncFileList,
+ (ack_wait_times>0 and '-w %d'% ack_wait_times or ''),
+ (mcast_addr and '-A '+mcast_addr or ''),
+ (port>0 and '-P %d'%port or ''),
+ (ttl>1 and '-T %d'%ttl or ''),
+ (loop and '-L' or ''),
+ (mcast_if and '-I %s'%mcast_if or ''),
+ (without_monitor and '-x' or '')]));
+
+cmd = gen_caster_cmd();
+if verbose>=1: print cmd;
+ex_code = os.system(cmd);
+print >>sys.stderr, 'ex_code= ', ex_code;
+
+# -------------- to exit
+printTimeMsg('ALL DONE.');
+prMulticastLog('multicast ends on %s' % host);
+sys.exit(ex_code);