PostgreSQL
Check-in [ff5cb39ea2]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a couple of bugs in pg_recvlogical output to stdout. Don't close stdout on SIGHUP. Also, when a SIGHUP is received, close the file immediately, rather than only after receiving some more data from the server. Rename a variable, to avoid mentally dealing with double negatives (not unsynced means synced).
Timelines: family | ancestors | descendants | both | trunk | WIN32_DEV | REL9_0_ALPHA4_BRANCH
Files: files | file ages | folders
SHA1:ff5cb39ea2d2a1f8207c68a7a16304fcee564a82
User & Date: heikki.linnakangas@iki.fi 2014-05-15 16:47:48
Context
2014-05-15
17:23
Fix whitespace check-in: de7408eae9 user: peter_e@gmx.net tags: trunk, WIN32_DEV, REL9_0_ALPHA4_BRANCH
16:47
Fix a couple of bugs in pg_recvlogical output to stdout. Don't close stdout on SIGHUP. Also, when a SIGHUP is received, close the file immediately, rather than only after receiving some more data fro... check-in: ff5cb39ea2 user: heikki.linnakangas@iki.fi tags: trunk, WIN32_DEV, REL9_0_ALPHA4_BRANCH
15:29
Handle duplicate XIDs in txid_snapshot. The proc array can contain duplicate XIDs, when a transaction is just being prepared for two-phase commit. To cope, remove any duplicates in txid_current_snaps... check-in: 92e43657bc user: heikki.linnakangas@iki.fi tags: trunk, WIN32_DEV, REL9_0_ALPHA4_BRANCH
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/bin/pg_basebackup/pg_recvlogical.c.

47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
...
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
...
299
300
301
302
303
304
305











306
307
308
309
310
311
312
...
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
...
464
465
466
467
468
469
470
471
472
473

474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490

491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
static const char *plugin = "test_decoding";

/* Global State */
static int	outfd = -1;
static volatile sig_atomic_t time_to_abort = false;
static volatile sig_atomic_t output_reopen = false;
static int64 output_last_fsync = -1;
static bool output_unsynced = false;
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;

static void usage(void);
static void StreamLog();
static void disconnect_and_exit(int code);

................................................................................
	output_last_fsync = now;

	output_fsync_lsn = output_written_lsn;

	if (fsync_interval <= 0)
		return true;

	if (!output_unsynced)
		return true;

	output_unsynced = false;

	/* Accept EINVAL, in case output is writing to a pipe or similar. */
	if (fsync(outfd) != 0 && errno != EINVAL)
	{
		fprintf(stderr,
				_("%s: could not fsync log file \"%s\": %s\n"),
				progname, outfile, strerror(errno));
................................................................................
		{
			/* Time to send feedback! */
			if (!sendFeedback(conn, now, true, false))
				goto error;

			last_status = now;
		}












		r = PQgetCopyData(conn, &copybuf, 1);
		if (r == 0)
		{
			/*
			 * In async mode, and no data available. We block on reading but
			 * not more than the specified timeout, so that we can send a
................................................................................

			/* Compute when we need to wakeup to send a keepalive message. */
			if (standby_message_timeout)
				message_target = last_status + (standby_message_timeout - 1) *
					((int64) 1000);

			/* Compute when we need to wakeup to fsync the output file. */
			if (fsync_interval > 0 && output_unsynced)
				fsync_target = output_last_fsync + (fsync_interval - 1) *
					((int64) 1000);

			/* Now compute when to wakeup. */
			if (message_target > 0 || fsync_target > 0)
			{
				int64		targettime;
................................................................................
		/* Extract WAL location for this block */
		{
			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);

			output_written_lsn = Max(temp, output_written_lsn);
		}

		/* redirect output to stdout */
		if (outfd == -1 && strcmp(outfile, "-") == 0)
		{

			outfd = fileno(stdout);
		}

		/* got SIGHUP, close output file */
		if (outfd != -1 && output_reopen)
		{
			now = feGetCurrentTimestamp();
			if (!OutputFsync(now))
				goto error;
			close(outfd);
			outfd = -1;
			output_reopen = false;
		}

		if (outfd == -1)
		{


			outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
						 S_IRUSR | S_IWUSR);
			if (outfd == -1)
			{
				fprintf(stderr,
						_("%s: could not open log file \"%s\": %s\n"),
						progname, outfile, strerror(errno));
				goto error;
			}
		}

		bytes_left = r - hdr_len;
		bytes_written = 0;

		/* signal that a fsync is needed */
		output_unsynced = true;

		while (bytes_left)
		{
			int			ret;

			ret = write(outfd,
						copybuf + hdr_len + bytes_written,







|







 







|


|







 







>
>
>
>
>
>
>
>
>
>
>







 







|







 







|
|

>
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
>
|
|













|







47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
...
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
...
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
...
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
...
475
476
477
478
479
480
481
482
483
484
485
486
















487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
static const char *plugin = "test_decoding";

/* Global State */
static int	outfd = -1;
static volatile sig_atomic_t time_to_abort = false;
static volatile sig_atomic_t output_reopen = false;
static int64 output_last_fsync = -1;
static bool output_needs_fsync = false;
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;

static void usage(void);
static void StreamLog();
static void disconnect_and_exit(int code);

................................................................................
	output_last_fsync = now;

	output_fsync_lsn = output_written_lsn;

	if (fsync_interval <= 0)
		return true;

	if (!output_needs_fsync)
		return true;

	output_needs_fsync = false;

	/* Accept EINVAL, in case output is writing to a pipe or similar. */
	if (fsync(outfd) != 0 && errno != EINVAL)
	{
		fprintf(stderr,
				_("%s: could not fsync log file \"%s\": %s\n"),
				progname, outfile, strerror(errno));
................................................................................
		{
			/* Time to send feedback! */
			if (!sendFeedback(conn, now, true, false))
				goto error;

			last_status = now;
		}

		/* got SIGHUP, close output file */
		if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
		{
			now = feGetCurrentTimestamp();
			if (!OutputFsync(now))
				goto error;
			close(outfd);
			outfd = -1;
		}
		output_reopen = false;

		r = PQgetCopyData(conn, &copybuf, 1);
		if (r == 0)
		{
			/*
			 * In async mode, and no data available. We block on reading but
			 * not more than the specified timeout, so that we can send a
................................................................................

			/* Compute when we need to wakeup to send a keepalive message. */
			if (standby_message_timeout)
				message_target = last_status + (standby_message_timeout - 1) *
					((int64) 1000);

			/* Compute when we need to wakeup to fsync the output file. */
			if (fsync_interval > 0 && output_needs_fsync)
				fsync_target = output_last_fsync + (fsync_interval - 1) *
					((int64) 1000);

			/* Now compute when to wakeup. */
			if (message_target > 0 || fsync_target > 0)
			{
				int64		targettime;
................................................................................
		/* Extract WAL location for this block */
		{
			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);

			output_written_lsn = Max(temp, output_written_lsn);
		}

		/* open the output file, if not open yet */
		if (outfd == -1)
		{
			if (strcmp(outfile, "-") == 0)
				outfd = fileno(stdout);
















			else
				outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
							 S_IRUSR | S_IWUSR);
			if (outfd == -1)
			{
				fprintf(stderr,
						_("%s: could not open log file \"%s\": %s\n"),
						progname, outfile, strerror(errno));
				goto error;
			}
		}

		bytes_left = r - hdr_len;
		bytes_written = 0;

		/* signal that a fsync is needed */
		output_needs_fsync = true;

		while (bytes_left)
		{
			int			ret;

			ret = write(outfd,
						copybuf + hdr_len + bytes_written,