aboutsummaryrefslogtreecommitdiffstats
path: root/sends.c
blob: 61aa07a015f83cfd15929ec7825416e5d3aebbf3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
/* 
   Copyright (C)  2008 Renaissance Technologies Corp.
                  main developer: HP Wei <hp@rentec.com>
   Copyright (C)  2006 Renaissance Technologies Corp.
                  main developer: HP Wei <hp@rentec.com>
   Copyright (C)  2005 Renaissance Technologies Corp.
                  main developer: HP Wei <hp@rentec.com>
     Following the suggestion by Robert Dack <robd@kelman.com>,
     I added the option to change the default IP address for multicasting
     and the PORT for flow control.  See mrsync.py for the new options.

   Copyright (C)  2001 Renaissance Technologies Corp.
                  main developer: HP Wei <hp@rentec.com>
   This file was modified in 2001 and later from files in the program 
   multicaster copyrighted by Aaron Hillegass as found at 
   <http://sourceforge.net/projects/multicaster/>

   Copyright (C)  2000 Aaron Hillegass <aaron@classmax.com>

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2, or (at your option)
   any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; see the file COPYING.
   If not, write to the Free Software Foundation,
   59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.  
*/

#include "main.h"
#include <net/if.h>
#ifdef _SUN
#include <sys/sockio.h>  /* define SIOCGIFADDR */
#else
#include <linux/sockios.h>
#endif

extern char *      my_MCAST_ADDR;  /* defined in multicaster.c */
extern int         my_PORT;        /* ditto */
extern int         my_TTL;
extern int         my_LOOP;
extern char *      my_IFname;      /* defined in multicaster.c */
extern int         verbose;

extern unsigned int nPages;
extern unsigned int last_bytes;    /* the number of bytes in the last page of a file */
extern int cur_entry;
extern int file_changed;
extern char* cmd_name[];

/* Where have we last sent from? */
int                most_recent_file;
int                most_recent_fd;

/* 
   Send buffer for storing the data and to be transmitted thru UDP 
   The format:
   (5*sizeof(int) bytes header) + (PAGE_SIZE data area)

   The header has five int_type (4 bytes) int's.
   (1) mode -- for master to give instructions to the target machines.
   (2) current file index (starting with 1)
   (3) current page index (starting with 1) -- gee!
   (4) bytes to be sent in this page via UPD 
   (5) total number of pages for this file

   data_ptr points to the data area.
*/
int               *mode_ptr; /* char type would cause alignment problem on Sparc */
int               *current_file_ptr;
int               *current_page_ptr;
int               *bytes_sent_ptr;
int               *total_pages_ptr;
char              *data_ptr;
char              *fill_here;
char               send_buff[PAGE_BUFFSIZE];

/* Send socket */
int                send_fd;
#ifndef IPV6
struct sockaddr_in send_addr;
#else
struct sockaddr_in6 send_addr;
#endif

/* for final statistics */
extern unsigned int total_pages;
extern off_t total_bytes;
off_t real_total_bytes;  
unsigned int real_total_pages;

/*
  set_mode sets the caster into a new mode.
  modes are defined in main.h:
*/
void set_mode(int new_mode)
{
  /* 20060323 convert it to network byte order */
  *mode_ptr = htonl(new_mode);
}

/* init_sends initializes the send buffer */
void init_sends()
{
  most_recent_file = most_recent_fd = -99999;
  real_total_bytes = 0;
  real_total_pages = 0;

  /* pointers for send buffer */
  mode_ptr = (int *)send_buff;  
  current_file_ptr = (int *)(mode_ptr+1);
  current_page_ptr = (int *)(current_file_ptr + 1);
  bytes_sent_ptr   = (int *)(current_page_ptr + 1);
  total_pages_ptr  = (int *)(bytes_sent_ptr + 1);
  data_ptr = (char *)(total_pages_ptr + 1);
  fill_here = data_ptr;

  /* send socket */
  send_fd = send_socket(&send_addr, my_MCAST_ADDR, my_PORT);
  
  /******* change MULTICAST_IF ********/
  if (my_IFname != NULL && mcast_set_if(send_fd, my_IFname, 0)<0) 
    perror("init_sends(): when set MULTICAST_IF\n");
  
  /* set multicast_ttl such that UDP can go to 2nd subnetwork */  
  if (mcast_set_ttl(send_fd, my_TTL) < 0) 
    perror("init_sends(): when set MULTICAST_TTL\n");
  
  /* disable multicast_loop such that there is no echo back on master */
  if (mcast_set_loop(send_fd, my_LOOP) < 0) 
    perror("init_sends(): when set MULTICAST_LOOP\n");  
}

void clear_send_buf()
{
  fill_here = data_ptr;
}

/*
  put file contents into send (UDP) buffer
  return the number of bytes put into the buffer.
*/
ssize_t pack_page_for_file(int page)
{
  if(verbose>=2)
    fprintf(stderr, "Sending page %d of file %d\n", page, current_entry());

  /*** 
     Adjust the position for reading
     NOTE:if the type (off_t) is not given, the large file operation
          would fail.
  ***/
  lseek(most_recent_fd, (off_t)PAGE_SIZE * (off_t)(page - 1), SEEK_SET);

  /* read it and put the content into send_buff */
  /* the max number this read() will return is PAGE_SIZE */
  return read(most_recent_fd, data_ptr, PAGE_SIZE);
}

int fexist(int entry) 
{
  if (entry != most_recent_file) {
    if (most_recent_fd > 0) close(most_recent_fd); /* make sure we close it */

    if((most_recent_fd = open(getFullname(), O_RDONLY, 0)) < 0){
      perror(getFullname());
    }
    most_recent_file = entry;    
  } 
  return (most_recent_fd >= 0);  /* <0 means FAIL */
}


/*
  send_buffer will send the buffer with the file information
  out to the socket connection with the catcher.
  return 0 -- ok, -1 sent failed.
*/
int send_buffer(int bytes_read)
{
    /* send the data */
    if(sendto(send_fd, send_buff, bytes_read + HEAD_SIZE, 
	      0, (const struct sockaddr *)&send_addr, sizeof(send_addr)) < 0) {
      perror("Sending packet");
      return FAIL;
    }
    return SUCCESS;
}

/*
  send_page takes a page from the current file and controls
  sending it out the socket to the catcher.  It calls send_buffer
  to do the actuall call to sendto.
*/
int send_page(int page)
{
  unsigned int bytes;

  if (verbose>=2) fprintf(stderr, "In send_page()\n");
  
  if (file_changed) {
    total_bytes -= ((page<nPages) ? PAGE_SIZE : last_bytes);
    --total_pages;
    return FAIL;
  }

  bytes = (unsigned int) pack_page_for_file(page);

  if ((page < nPages && bytes != PAGE_SIZE) || 
      (nPages == page && bytes != last_bytes)) {
    /* file under sync must have been changed during syncing */
    fprintf(stderr, "Warning: read() error, expected_bytes= %d, real= %d, "
	    "page = %d, nPages = %s%d, for file %s\n",
	    (page<nPages) ? PAGE_SIZE : last_bytes, 
	    bytes, page, (current_entry()<0) ? "-" : "", nPages, getFullname());
    file_changed = TRUE;
    /* make corrections in total_xxx that we send 
       otherwise the final statistic will be messed up */
    total_bytes -= ((page<nPages) ? PAGE_SIZE : last_bytes);
    --total_pages;
    return FAIL;
  }

  /* fill in the header data */
  /* 20060323 -- add htonl so that the codes can work across
     different machines with either little- or big-endian.
     So, before we send out ints, we convert them to network-byte order*/
  *total_pages_ptr = htonl(nPages);
  *bytes_sent_ptr  = htonl(bytes);
  *current_file_ptr = htonl(cur_entry);
  *current_page_ptr = htonl(page);

  /* for final statistics */
  ++real_total_pages;
  real_total_bytes += bytes; 

  if (verbose>=3)
    fprintf(stderr, "Sending page=%d of %d in file %d of %d\n",
	    page, nPages, 
	    cur_entry, total_entries());
  return send_buffer(bytes);
}

/*
  send_test zeroes out the buffers going to the catcher
  and thereby sends a test packet to the catcher.
*/
void send_test()
{
  *mode_ptr = htonl(TEST); /* --> network byte order */
  
  *current_file_ptr = 0;  
  *current_page_ptr = 0; 
  *total_pages_ptr = 0;
  *bytes_sent_ptr = 0;
  
  send_buffer(0);
  fprintf(stderr, "Test packet is sent.\n");
}

void send_cmd(int code, int machine_id) 
{
  /*
    Except for OPEN_FILE_CMD,
     only the header area in the send_buff gets filled
     with data.
  */
  *mode_ptr = htonl(code); /* --> network byte order */    
  *current_page_ptr = htonl(machine_id); /* -1 being all machines */
  *current_file_ptr = htonl(cur_entry);
  *total_pages_ptr = htonl(nPages);

  *bytes_sent_ptr = (code == OPEN_FILE_CMD) ?  htonl(fill_here - data_ptr) :0;

  /* do the header in send_buf */
  send_buffer((code == OPEN_FILE_CMD) ? fill_here - data_ptr : 0);

  /* print message */
  if (verbose>=2) {
    fprintf(stderr, "cmd [%s] sent\n", cmd_name[code]);
  }
}

void pack_open_file_info()
{
  /* prepare udp send_buffer for file info
     (header) (stat_ascii)\0(filename)\0(if_is_link linktar_path)\0
  */
  clear_send_buf();
  fill_here += fill_in_stat(data_ptr); 
  fill_here += fill_in_filename(fill_here);  
  if (is_softlink() || is_hardlink()) {
    fill_here += fill_in_linktar(fill_here);
  }
}

void send_all_done_cmd()
{
  int i;
  *mode_ptr = htonl(ALL_DONE_CMD); /* --> network byte order */
  
  *current_file_ptr = 0;  
  *current_page_ptr = 0; 
  *total_pages_ptr = 0;
  *bytes_sent_ptr = 0;
  
  for(i=0; i<10; ++i) { /* do it many times, in case network is busy */
    send_buffer(0); 
    usleep(DT_PERPAGE*10);
  }
  /* NOTE: it is still possible that ALL_DONE msg could not
           be received by targets.
           For total robustness, some independent checking on targets
           should be done.
  */
  fprintf(stderr, "(ALL DONE)\n");
}

void my_exit(int good_or_bad)
{
  if (send_fd) send_all_done_cmd();
  exit(good_or_bad);
}