pacemaker  2.0.3-4b1f869f0f
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2018 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This source code is licensed under the GNU Lesser General Public License
5  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
6  */
7 
8 #include <crm_internal.h>
9 #include <crm/crm.h>
10 
11 #include <sys/param.h>
12 #include <stdio.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <unistd.h>
16 #include <sys/socket.h>
17 #include <arpa/inet.h>
18 #include <netinet/in.h>
19 #include <netinet/ip.h>
20 #include <netinet/tcp.h>
21 #include <netdb.h>
22 #include <stdlib.h>
23 #include <errno.h>
24 #include <inttypes.h> /* X32T ~ PRIx32 */
25 
26 #include <glib.h>
27 #include <bzlib.h>
28 
29 #include <crm/common/ipcs.h>
30 #include <crm/common/xml.h>
31 #include <crm/common/mainloop.h>
33 
34 #ifdef HAVE_GNUTLS_GNUTLS_H
35 # undef KEYFILE
36 # include <gnutls/gnutls.h>
37 
38 const int psk_tls_kx_order[] = {
39  GNUTLS_KX_DHE_PSK,
40  GNUTLS_KX_PSK,
41 };
42 
43 const int anon_tls_kx_order[] = {
44  GNUTLS_KX_ANON_DH,
45  GNUTLS_KX_DHE_RSA,
46  GNUTLS_KX_DHE_DSS,
47  GNUTLS_KX_RSA,
48  0
49 };
50 #endif
51 
52 /* Swab macros from linux/swab.h */
53 #ifdef HAVE_LINUX_SWAB_H
54 # include <linux/swab.h>
55 #else
56 /*
57  * casts are necessary for constants, because we never know how for sure
58  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
59  */
60 #define __swab16(x) ((uint16_t)( \
61  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
62  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
63 
64 #define __swab32(x) ((uint32_t)( \
65  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
66  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
67  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
68  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
69 
70 #define __swab64(x) ((uint64_t)( \
71  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
72  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
73  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
74  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
75  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
76  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
77  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
78  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
79 #endif
80 
81 #define REMOTE_MSG_VERSION 1
82 #define ENDIAN_LOCAL 0xBADADBBD
83 
84 struct crm_remote_header_v0
85 {
86  uint32_t endian; /* Detect messages from hosts with different endian-ness */
87  uint32_t version;
88  uint64_t id;
89  uint64_t flags;
90  uint32_t size_total;
91  uint32_t payload_offset;
92  uint32_t payload_compressed;
93  uint32_t payload_uncompressed;
94 
95  /* New fields get added here */
96 
97 } __attribute__ ((packed));
98 
99 static struct crm_remote_header_v0 *
100 crm_remote_header(crm_remote_t * remote)
101 {
102  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
103  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
104  return NULL;
105 
106  } else if(header->endian != ENDIAN_LOCAL) {
107  uint32_t endian = __swab32(header->endian);
108 
110  if(endian != ENDIAN_LOCAL) {
111  crm_err("Invalid message detected, endian mismatch: %" X32T
112  " is neither %" X32T " nor the swab'd %" X32T,
113  ENDIAN_LOCAL, header->endian, endian);
114  return NULL;
115  }
116 
117  header->id = __swab64(header->id);
118  header->flags = __swab64(header->flags);
119  header->endian = __swab32(header->endian);
120 
121  header->version = __swab32(header->version);
122  header->size_total = __swab32(header->size_total);
123  header->payload_offset = __swab32(header->payload_offset);
124  header->payload_compressed = __swab32(header->payload_compressed);
125  header->payload_uncompressed = __swab32(header->payload_uncompressed);
126  }
127 
128  return header;
129 }
130 
131 #ifdef HAVE_GNUTLS_GNUTLS_H
132 
133 int
134 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
135 {
136  int rc = 0;
137  int pollrc = 0;
138  time_t start = time(NULL);
139 
140  do {
141  rc = gnutls_handshake(*remote->tls_session);
142  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
143  pollrc = crm_remote_ready(remote, 1000);
144  if (pollrc < 0) {
145  /* poll returned error, there is no hope */
146  rc = -1;
147  }
148  }
149 
150  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
151  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
152 
153  if (rc < 0) {
154  crm_trace("gnutls_handshake() failed with %d", rc);
155  }
156  return rc;
157 }
158 
165 static void
166 pcmk__set_minimum_dh_bits(gnutls_session_t *session)
167 {
168  const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
169 
170  if (dh_min_bits_s) {
171  int dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
172 
173  /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
174  * the priority string imply the DH requirements, but this is the only
175  * way to give the user control over compatibility with older servers.
176  */
177  if (dh_min_bits > 0) {
178  crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
179  dh_min_bits);
180  gnutls_dh_set_prime_bits(*session, dh_min_bits);
181  }
182  }
183 }
184 
185 static unsigned int
186 pcmk__bound_dh_bits(unsigned int dh_bits)
187 {
188  const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
189  const char *dh_max_bits_s = getenv("PCMK_dh_max_bits");
190  int dh_min_bits = 0;
191  int dh_max_bits = 0;
192 
193  if (dh_min_bits_s) {
194  dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
195  }
196  if (dh_max_bits_s) {
197  dh_max_bits = crm_parse_int(dh_max_bits_s, "0");
198  if ((dh_min_bits > 0) && (dh_max_bits > 0)
199  && (dh_max_bits < dh_min_bits)) {
200  crm_warn("Ignoring PCMK_dh_max_bits because it is less than PCMK_dh_min_bits");
201  dh_max_bits = 0;
202  }
203  }
204  if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
205  return dh_min_bits;
206  }
207  if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
208  return dh_max_bits;
209  }
210  return dh_bits;
211 }
212 
225 pcmk__new_tls_session(int csock, unsigned int conn_type,
226  gnutls_credentials_type_t cred_type, void *credentials)
227 {
228  int rc = GNUTLS_E_SUCCESS;
229  const char *prio_base = NULL;
230  char *prio = NULL;
231  gnutls_session_t *session = NULL;
232 
233  /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
234  * values required for its functionality.
235  *
236  * For an example of anonymous authentication, see:
237  * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
238  */
239 
240  prio_base = getenv("PCMK_tls_priorities");
241  if (prio_base == NULL) {
242  prio_base = PCMK_GNUTLS_PRIORITIES;
243  }
244  prio = crm_strdup_printf("%s:%s", prio_base,
245  (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
246 
247  session = gnutls_malloc(sizeof(gnutls_session_t));
248  if (session == NULL) {
249  rc = GNUTLS_E_MEMORY_ERROR;
250  goto error;
251  }
252 
253  rc = gnutls_init(session, conn_type);
254  if (rc != GNUTLS_E_SUCCESS) {
255  goto error;
256  }
257 
258  /* @TODO On the server side, it would be more efficient to cache the
259  * priority with gnutls_priority_init2() and set it with
260  * gnutls_priority_set() for all sessions.
261  */
262  rc = gnutls_priority_set_direct(*session, prio, NULL);
263  if (rc != GNUTLS_E_SUCCESS) {
264  goto error;
265  }
266  if (conn_type == GNUTLS_CLIENT) {
267  pcmk__set_minimum_dh_bits(session);
268  }
269 
270  gnutls_transport_set_ptr(*session,
271  (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
272 
273  rc = gnutls_credentials_set(*session, cred_type, credentials);
274  if (rc != GNUTLS_E_SUCCESS) {
275  goto error;
276  }
277  free(prio);
278  return session;
279 
280 error:
281  crm_err("Could not initialize %s TLS %s session: %s "
282  CRM_XS " rc=%d priority='%s'",
283  (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
284  (conn_type == GNUTLS_SERVER)? "server" : "client",
285  gnutls_strerror(rc), rc, prio);
286  free(prio);
287  if (session != NULL) {
288  gnutls_free(session);
289  }
290  return NULL;
291 }
292 
308 int
309 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
310 {
311  int rc = GNUTLS_E_SUCCESS;
312  unsigned int dh_bits = 0;
313 
314  rc = gnutls_dh_params_init(dh_params);
315  if (rc != GNUTLS_E_SUCCESS) {
316  goto error;
317  }
318 
319 #ifdef HAVE_GNUTLS_SEC_PARAM_TO_PK_BITS
320  dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
321  GNUTLS_SEC_PARAM_NORMAL);
322  if (dh_bits == 0) {
323  rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
324  goto error;
325  }
326 #else
327  dh_bits = 1024;
328 #endif
329  dh_bits = pcmk__bound_dh_bits(dh_bits);
330 
331  crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
332  dh_bits);
333  rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
334  if (rc != GNUTLS_E_SUCCESS) {
335  goto error;
336  }
337 
338  return rc;
339 
340 error:
341  crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
342  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
343  CRM_ASSERT(rc == GNUTLS_E_SUCCESS);
344  return rc;
345 }
346 
359 int
360 pcmk__read_handshake_data(crm_client_t *client)
361 {
362  int rc = 0;
363 
364  CRM_ASSERT(client && client->remote && client->remote->tls_session);
365 
366  do {
367  rc = gnutls_handshake(*client->remote->tls_session);
368  } while (rc == GNUTLS_E_INTERRUPTED);
369 
370  if (rc == GNUTLS_E_AGAIN) {
371  /* No more data is available at the moment. This function should be
372  * invoked again once the client sends more.
373  */
374  return 0;
375  } else if (rc != GNUTLS_E_SUCCESS) {
376  return rc;
377  }
378  return 1;
379 }
380 
381 static int
382 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
383 {
384  const char *unsent = buf;
385  int rc = 0;
386  int total_send;
387 
388  if (buf == NULL) {
389  return -EINVAL;
390  }
391 
392  total_send = len;
393  crm_trace("Message size: %llu", (unsigned long long) len);
394 
395  while (TRUE) {
396  rc = gnutls_record_send(*session, unsent, len);
397 
398  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
399  crm_trace("Retrying to send %llu bytes",
400  (unsigned long long) len);
401 
402  } else if (rc < 0) {
403  // Caller can log as error if necessary
404  crm_info("TLS connection terminated: %s " CRM_XS " rc=%d",
405  gnutls_strerror(rc), rc);
406  rc = -ECONNABORTED;
407  break;
408 
409  } else if (rc < len) {
410  crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
411  len -= rc;
412  unsent += rc;
413  } else {
414  crm_trace("Sent all %d bytes", rc);
415  break;
416  }
417  }
418 
419  return rc < 0 ? rc : total_send;
420 }
421 #endif
422 
423 static int
424 crm_send_plaintext(int sock, const char *buf, size_t len)
425 {
426 
427  int rc = 0;
428  const char *unsent = buf;
429  int total_send;
430 
431  if (buf == NULL) {
432  return -EINVAL;
433  }
434  total_send = len;
435 
436  crm_trace("Message on socket %d: size=%llu",
437  sock, (unsigned long long) len);
438  retry:
439  rc = write(sock, unsent, len);
440  if (rc < 0) {
441  rc = -errno;
442  switch (errno) {
443  case EINTR:
444  case EAGAIN:
445  crm_trace("Retry");
446  goto retry;
447  default:
448  crm_perror(LOG_INFO,
449  "Could only write %d of the remaining %llu bytes",
450  rc, (unsigned long long) len);
451  break;
452  }
453 
454  } else if (rc < len) {
455  crm_trace("Only sent %d of %llu remaining bytes",
456  rc, (unsigned long long) len);
457  len -= rc;
458  unsent += rc;
459  goto retry;
460 
461  } else {
462  crm_trace("Sent %d bytes: %.100s", rc, buf);
463  }
464 
465  return rc < 0 ? rc : total_send;
466 
467 }
468 
469 static int
470 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
471 {
472  int rc = 0;
473 
474  for (int lpc = 0; (lpc < iovs) && (rc >= 0); lpc++) {
475 #ifdef HAVE_GNUTLS_GNUTLS_H
476  if (remote->tls_session) {
477  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
478  continue;
479  }
480 #endif
481  if (remote->tcp_socket) {
482  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
483  } else {
484  rc = -ESOCKTNOSUPPORT;
485  }
486  }
487  return rc;
488 }
489 
490 int
491 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
492 {
493  int rc = pcmk_ok;
494  static uint64_t id = 0;
495  char *xml_text = dump_xml_unformatted(msg);
496 
497  struct iovec iov[2];
498  struct crm_remote_header_v0 *header;
499 
500  if (xml_text == NULL) {
501  crm_err("Could not send remote message: no message provided");
502  return -EINVAL;
503  }
504 
505  header = calloc(1, sizeof(struct crm_remote_header_v0));
506  iov[0].iov_base = header;
507  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
508 
509  iov[1].iov_base = xml_text;
510  iov[1].iov_len = 1 + strlen(xml_text);
511 
512  id++;
513  header->id = id;
514  header->endian = ENDIAN_LOCAL;
515  header->version = REMOTE_MSG_VERSION;
516  header->payload_offset = iov[0].iov_len;
517  header->payload_uncompressed = iov[1].iov_len;
518  header->size_total = iov[0].iov_len + iov[1].iov_len;
519 
520  crm_trace("Sending len[0]=%d, start=%x",
521  (int)iov[0].iov_len, *(int*)(void*)xml_text);
522  rc = crm_remote_sendv(remote, iov, 2);
523  if (rc < 0) {
524  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
525  pcmk_strerror(rc), rc);
526  }
527 
528  free(iov[0].iov_base);
529  free(iov[1].iov_base);
530  return rc;
531 }
532 
533 
539 xmlNode *
541 {
542  xmlNode *xml = NULL;
543  struct crm_remote_header_v0 *header = crm_remote_header(remote);
544 
545  if (remote->buffer == NULL || header == NULL) {
546  return NULL;
547  }
548 
549  /* Support compression on the receiving end now, in case we ever want to add it later */
550  if (header->payload_compressed) {
551  int rc = 0;
552  unsigned int size_u = 1 + header->payload_uncompressed;
553  char *uncompressed = calloc(1, header->payload_offset + size_u);
554 
555  crm_trace("Decompressing message data %d bytes into %d bytes",
556  header->payload_compressed, size_u);
557 
558  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
559  remote->buffer + header->payload_offset,
560  header->payload_compressed, 1, 0);
561 
562  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
563  crm_warn("Couldn't decompress v%d message, we only understand v%d",
564  header->version, REMOTE_MSG_VERSION);
565  free(uncompressed);
566  return NULL;
567 
568  } else if (rc != BZ_OK) {
569  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
570  bz2_strerror(rc), rc);
571  free(uncompressed);
572  return NULL;
573  }
574 
575  CRM_ASSERT(size_u == header->payload_uncompressed);
576 
577  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
578  remote->buffer_size = header->payload_offset + size_u;
579 
580  free(remote->buffer);
581  remote->buffer = uncompressed;
582  header = crm_remote_header(remote);
583  }
584 
585  /* take ownership of the buffer */
586  remote->buffer_offset = 0;
587 
588  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
589 
590  xml = string2xml(remote->buffer + header->payload_offset);
591  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
592  crm_warn("Couldn't parse v%d message, we only understand v%d",
593  header->version, REMOTE_MSG_VERSION);
594 
595  } else if (xml == NULL) {
596  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
597  }
598 
599  return xml;
600 }
601 
611 int
612 crm_remote_ready(crm_remote_t *remote, int total_timeout)
613 {
614  struct pollfd fds = { 0, };
615  int sock = 0;
616  int rc = 0;
617  time_t start;
618  int timeout = total_timeout;
619 
620 #ifdef HAVE_GNUTLS_GNUTLS_H
621  if (remote->tls_session) {
622  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
623 
624  sock = GPOINTER_TO_INT(sock_ptr);
625  } else if (remote->tcp_socket) {
626 #else
627  if (remote->tcp_socket) {
628 #endif
629  sock = remote->tcp_socket;
630  } else {
631  crm_err("Unsupported connection type");
632  }
633 
634  if (sock <= 0) {
635  crm_trace("No longer connected");
636  return -ENOTCONN;
637  }
638 
639  start = time(NULL);
640  errno = 0;
641  do {
642  fds.fd = sock;
643  fds.events = POLLIN;
644 
645  /* If we got an EINTR while polling, and we have a
646  * specific timeout we are trying to honor, attempt
647  * to adjust the timeout to the closest second. */
648  if (errno == EINTR && (timeout > 0)) {
649  timeout = total_timeout - ((time(NULL) - start) * 1000);
650  if (timeout < 1000) {
651  timeout = 1000;
652  }
653  }
654 
655  rc = poll(&fds, 1, timeout);
656  } while (rc < 0 && errno == EINTR);
657 
658  return (rc < 0)? -errno : rc;
659 }
660 
661 
672 static size_t
673 crm_remote_recv_once(crm_remote_t * remote)
674 {
675  int rc = 0;
676  size_t read_len = sizeof(struct crm_remote_header_v0);
677  struct crm_remote_header_v0 *header = crm_remote_header(remote);
678 
679  if(header) {
680  /* Stop at the end of the current message */
681  read_len = header->size_total;
682  }
683 
684  /* automatically grow the buffer when needed */
685  if(remote->buffer_size < read_len) {
686  remote->buffer_size = 2 * read_len;
687  crm_trace("Expanding buffer to %llu bytes",
688  (unsigned long long) remote->buffer_size);
689 
690  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
691  CRM_ASSERT(remote->buffer != NULL);
692  }
693 
694 #ifdef HAVE_GNUTLS_GNUTLS_H
695  if (remote->tls_session) {
696  rc = gnutls_record_recv(*(remote->tls_session),
697  remote->buffer + remote->buffer_offset,
698  remote->buffer_size - remote->buffer_offset);
699  if (rc == GNUTLS_E_INTERRUPTED) {
700  rc = -EINTR;
701  } else if (rc == GNUTLS_E_AGAIN) {
702  rc = -EAGAIN;
703  } else if (rc < 0) {
704  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
705  rc = -pcmk_err_generic;
706  }
707  } else if (remote->tcp_socket) {
708 #else
709  if (remote->tcp_socket) {
710 #endif
711  errno = 0;
712  rc = read(remote->tcp_socket,
713  remote->buffer + remote->buffer_offset,
714  remote->buffer_size - remote->buffer_offset);
715  if(rc < 0) {
716  rc = -errno;
717  }
718 
719  } else {
720  crm_err("Unsupported connection type");
721  return -ESOCKTNOSUPPORT;
722  }
723 
724  /* process any errors. */
725  if (rc > 0) {
726  remote->buffer_offset += rc;
727  /* always null terminate buffer, the +1 to alloc always allows for this. */
728  remote->buffer[remote->buffer_offset] = '\0';
729  crm_trace("Received %u more bytes, %llu total",
730  rc, (unsigned long long) remote->buffer_offset);
731 
732  } else if (rc == -EINTR || rc == -EAGAIN) {
733  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
734 
735  } else if (rc == 0) {
736  crm_debug("EOF encoutered after %llu bytes",
737  (unsigned long long) remote->buffer_offset);
738  return -ENOTCONN;
739 
740  } else {
741  crm_debug("Error receiving message after %llu bytes: %s (%d)",
742  (unsigned long long) remote->buffer_offset,
743  pcmk_strerror(rc), rc);
744  return -ENOTCONN;
745  }
746 
747  header = crm_remote_header(remote);
748  if(header) {
749  if(remote->buffer_offset < header->size_total) {
750  crm_trace("Read less than the advertised length: %llu < %u bytes",
751  (unsigned long long) remote->buffer_offset,
752  header->size_total);
753  } else {
754  crm_trace("Read full message of %llu bytes",
755  (unsigned long long) remote->buffer_offset);
756  return remote->buffer_offset;
757  }
758  }
759 
760  return -EAGAIN;
761 }
762 
773 gboolean
774 crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
775 {
776  int rc;
777  time_t start = time(NULL);
778  int remaining_timeout = 0;
779 
780  if (total_timeout == 0) {
781  total_timeout = 10000;
782  } else if (total_timeout < 0) {
783  total_timeout = 60000;
784  }
785  *disconnected = 0;
786 
787  remaining_timeout = total_timeout;
788  while ((remaining_timeout > 0) && !(*disconnected)) {
789 
790  crm_trace("Waiting for remote data (%d of %d ms timeout remaining)",
791  remaining_timeout, total_timeout);
792  rc = crm_remote_ready(remote, remaining_timeout);
793 
794  if (rc == 0) {
795  crm_err("Timed out (%d ms) while waiting for remote data",
796  remaining_timeout);
797  return FALSE;
798 
799  } else if (rc < 0) {
800  crm_debug("Wait for remote data aborted, will try again: %s "
801  CRM_XS " rc=%d", pcmk_strerror(rc), rc);
802 
803  } else {
804  rc = crm_remote_recv_once(remote);
805  if (rc > 0) {
806  return TRUE;
807  } else if (rc == -EAGAIN) {
808  crm_trace("Still waiting for remote data");
809  } else if (rc < 0) {
810  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
811  pcmk_strerror(rc), rc);
812  }
813  }
814 
815  if (rc == -ENOTCONN) {
816  *disconnected = 1;
817  return FALSE;
818  }
819 
820  remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
821  }
822 
823  return FALSE;
824 }
825 
826 struct tcp_async_cb_data {
827  gboolean success;
828  int sock;
829  void *userdata;
830  void (*callback) (void *userdata, int sock);
831  int timeout; /*ms */
832  time_t start;
833 };
834 
835 static gboolean
836 check_connect_finished(gpointer userdata)
837 {
838  struct tcp_async_cb_data *cb_data = userdata;
839  int cb_arg = 0; // socket fd on success, -errno on error
840  int sock = cb_data->sock;
841  int error = 0;
842 
843  fd_set rset, wset;
844  socklen_t len = sizeof(error);
845  struct timeval ts = { 0, };
846 
847  if (cb_data->success == TRUE) {
848  goto dispatch_done;
849  }
850 
851  FD_ZERO(&rset);
852  FD_SET(sock, &rset);
853  wset = rset;
854 
855  crm_trace("fd %d: checking to see if connect finished", sock);
856  cb_arg = select(sock + 1, &rset, &wset, NULL, &ts);
857 
858  if (cb_arg < 0) {
859  cb_arg = -errno;
860  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
861  /* reschedule if there is still time left */
862  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
863  goto reschedule;
864  } else {
865  cb_arg = -ETIMEDOUT;
866  }
867  }
868  crm_trace("fd %d: select failed %d connect dispatch ", sock, cb_arg);
869  goto dispatch_done;
870  } else if (cb_arg == 0) {
871  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
872  goto reschedule;
873  }
874  crm_debug("fd %d: timeout during select", sock);
875  cb_arg = -ETIMEDOUT;
876  goto dispatch_done;
877  } else {
878  crm_trace("fd %d: select returned success", sock);
879  cb_arg = 0;
880  }
881 
882  /* can we read or write to the socket now? */
883  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
884  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
885  cb_arg = -errno;
886  crm_trace("fd %d: call to getsockopt failed", sock);
887  goto dispatch_done;
888  }
889  if (error) {
890  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
891  cb_arg = -error;
892  goto dispatch_done;
893  }
894  } else {
895  crm_trace("neither read nor write set after select");
896  cb_arg = -EAGAIN;
897  goto dispatch_done;
898  }
899 
900  dispatch_done:
901  if (!cb_arg) {
902  crm_trace("fd %d: connected", sock);
903  /* Success, set the return code to the sock to report to the callback */
904  cb_arg = cb_data->sock;
905  cb_data->sock = 0;
906  } else {
907  close(sock);
908  }
909 
910  if (cb_data->callback) {
911  cb_data->callback(cb_data->userdata, cb_arg);
912  }
913  free(cb_data);
914  return FALSE;
915 
916  reschedule:
917 
918  /* will check again next interval */
919  return TRUE;
920 }
921 
922 static int
923 internal_tcp_connect_async(int sock,
924  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
925  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
926 {
927  int rc = 0;
928  int interval = 500;
929  int timer;
930  struct tcp_async_cb_data *cb_data = NULL;
931 
932  rc = crm_set_nonblocking(sock);
933  if (rc < 0) {
934  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
935  pcmk_strerror(rc), rc);
936  close(sock);
937  return -1;
938  }
939 
940  rc = connect(sock, addr, addrlen);
941  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
942  crm_perror(LOG_WARNING, "connect");
943  return -1;
944  }
945 
946  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
947  cb_data->userdata = userdata;
948  cb_data->callback = callback;
949  cb_data->sock = sock;
950  cb_data->timeout = timeout;
951  cb_data->start = time(NULL);
952 
953  if (rc == 0) {
954  /* The connect was successful immediately, we still return to mainloop
955  * and let this callback get called later. This avoids the user of this api
956  * to have to account for the fact the callback could be invoked within this
957  * function before returning. */
958  cb_data->success = TRUE;
959  interval = 1;
960  }
961 
962  /* Check connect finished is mostly doing a non-block poll on the socket
963  * to see if we can read/write to it. Once we can, the connect has completed.
964  * This method allows us to connect to the server without blocking mainloop.
965  *
966  * This is a poor man's way of polling to see when the connection finished.
967  * At some point we should figure out a way to use a mainloop fd callback for this.
968  * Something about the way mainloop is currently polling prevents this from working at the
969  * moment though. */
970  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
971  interval, sock);
972  timer = g_timeout_add(interval, check_connect_finished, cb_data);
973  if (timer_id) {
974  *timer_id = timer;
975  }
976 
977  return 0;
978 }
979 
980 static int
981 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
982 {
983  int rc = connect(sock, addr, addrlen);
984 
985  if (rc < 0) {
986  rc = -errno;
987  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
988  pcmk_strerror(rc), rc);
989  return rc;
990  }
991 
992  rc = crm_set_nonblocking(sock);
993  if (rc < 0) {
994  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
995  pcmk_strerror(rc), rc);
996  return rc;
997  }
998 
999  return pcmk_ok;
1000 }
1001 
1015 int
1016 crm_remote_tcp_connect_async(const char *host, int port, int timeout,
1017  int *timer_id, void *userdata,
1018  void (*callback) (void *userdata, int sock))
1019 {
1020  char buffer[INET6_ADDRSTRLEN];
1021  struct addrinfo *res = NULL;
1022  struct addrinfo *rp = NULL;
1023  struct addrinfo hints;
1024  const char *server = host;
1025  int ret_ga;
1026  int sock = -ENOTCONN;
1027 
1028  // Get host's IP address(es)
1029  memset(&hints, 0, sizeof(struct addrinfo));
1030  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1031  hints.ai_socktype = SOCK_STREAM;
1032  hints.ai_flags = AI_CANONNAME;
1033  ret_ga = getaddrinfo(server, NULL, &hints, &res);
1034  if (ret_ga) {
1035  crm_err("Unable to get IP address info for %s: %s",
1036  server, gai_strerror(ret_ga));
1037  goto async_cleanup;
1038  }
1039  if (!res || !res->ai_addr) {
1040  crm_err("Unable to get IP address info for %s: no result", server);
1041  goto async_cleanup;
1042  }
1043 
1044  // getaddrinfo() returns a list of host's addresses, try them in order
1045  for (rp = res; rp != NULL; rp = rp->ai_next) {
1046  struct sockaddr *addr = rp->ai_addr;
1047 
1048  if (!addr) {
1049  continue;
1050  }
1051 
1052  if (rp->ai_canonname) {
1053  server = res->ai_canonname;
1054  }
1055  crm_debug("Got canonical name %s for %s", server, host);
1056 
1057  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1058  if (sock == -1) {
1059  crm_perror(LOG_WARNING, "creating socket for connection to %s",
1060  server);
1061  sock = -ENOTCONN;
1062  continue;
1063  }
1064 
1065  /* Set port appropriately for address family */
1066  /* (void*) casts avoid false-positive compiler alignment warnings */
1067  if (addr->sa_family == AF_INET6) {
1068  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1069  } else {
1070  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1071  }
1072 
1073  memset(buffer, 0, DIMOF(buffer));
1074  crm_sockaddr2str(addr, buffer);
1075  crm_info("Attempting TCP connection to %s:%d", buffer, port);
1076 
1077  if (callback) {
1078  if (internal_tcp_connect_async
1079  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
1080  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1081  }
1082 
1083  } else if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
1084  break; /* Success */
1085  }
1086 
1087  close(sock);
1088  sock = -ENOTCONN;
1089  }
1090 
1091 async_cleanup:
1092 
1093  if (res) {
1094  freeaddrinfo(res);
1095  }
1096  return sock;
1097 }
1098 
1099 int
1100 crm_remote_tcp_connect(const char *host, int port)
1101 {
1102  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
1103 }
1104 
1115 void
1116 crm_sockaddr2str(void *sa, char *s)
1117 {
1118  switch (((struct sockaddr*)sa)->sa_family) {
1119  case AF_INET:
1120  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
1121  s, INET6_ADDRSTRLEN);
1122  break;
1123 
1124  case AF_INET6:
1125  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
1126  s, INET6_ADDRSTRLEN);
1127  break;
1128 
1129  default:
1130  strcpy(s, "<invalid>");
1131  }
1132 }
1133 
1134 int
1136 {
1137  int csock = 0;
1138  int rc = 0;
1139  unsigned laddr = 0;
1140  struct sockaddr_storage addr;
1141  char addr_str[INET6_ADDRSTRLEN];
1142 #ifdef TCP_USER_TIMEOUT
1143  int optval;
1144  long sbd_timeout = crm_get_sbd_timeout();
1145 #endif
1146 
1147  /* accept the connection */
1148  laddr = sizeof(addr);
1149  memset(&addr, 0, sizeof(addr));
1150  csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1151  crm_sockaddr2str(&addr, addr_str);
1152  crm_info("New remote connection from %s", addr_str);
1153 
1154  if (csock == -1) {
1155  crm_err("accept socket failed");
1156  return -1;
1157  }
1158 
1159  rc = crm_set_nonblocking(csock);
1160  if (rc < 0) {
1161  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1162  pcmk_strerror(rc), rc);
1163  close(csock);
1164  return rc;
1165  }
1166 
1167 #ifdef TCP_USER_TIMEOUT
1168  if (sbd_timeout > 0) {
1169  optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
1170  rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
1171  &optval, sizeof(optval));
1172  if (rc < 0) {
1173  crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
1174  optval);
1175  close(csock);
1176  return rc;
1177  }
1178  }
1179 #endif
1180 
1181  return csock;
1182 }
1183 
1189 int
1191 {
1192  static int port = 0;
1193 
1194  if (port == 0) {
1195  const char *env = getenv("PCMK_remote_port");
1196 
1197  if (env) {
1198  errno = 0;
1199  port = strtol(env, NULL, 10);
1200  if (errno || (port < 1) || (port > 65535)) {
1201  crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1202  env, DEFAULT_REMOTE_PORT);
1203  port = DEFAULT_REMOTE_PORT;
1204  }
1205  } else {
1206  port = DEFAULT_REMOTE_PORT;
1207  }
1208  }
1209  return port;
1210 }
PCMK_GNUTLS_PRIORITIES
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:541
remote_internal.h
payload_uncompressed
uint32_t payload_uncompressed
Definition: remote.c:9
dump_xml_unformatted
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3306
pcmk_err_generic
#define pcmk_err_generic
Definition: results.h:60
ipcs.h
payload_compressed
uint32_t payload_compressed
Definition: remote.c:8
flags
uint64_t flags
Definition: remote.c:5
crm_remote_tcp_connect
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:1100
pcmk_strerror
const char * pcmk_strerror(int rc)
Definition: results.c:188
crm_remote_recv
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:774
crm_err
#define crm_err(fmt, args...)
Definition: logging.h:241
endian
uint32_t endian
Definition: remote.c:2
crm_trace
#define crm_trace(fmt, args...)
Definition: logging.h:247
crm_warn
#define crm_warn(fmt, args...)
Definition: logging.h:242
__attribute__
struct tcp_async_cb_data __attribute__
xml.h
Wrappers for and extensions to libxml2.
crm_remote_s::buffer_offset
size_t buffer_offset
Definition: ipcs.h:41
crm_remote_s
Definition: ipcs.h:37
X32T
#define X32T
Definition: config.h:642
mainloop.h
Wrappers for and extensions to glib mainloop.
crm_sockaddr2str
void crm_sockaddr2str(void *sa, char *s)
Convert an IP address (IPv4 or IPv6) to a string for logging.
Definition: remote.c:1116
crm_remote_parse_buffer
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:540
crm_remote_tcp_connect_async
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:1016
crm_info
#define crm_info(fmt, args...)
Definition: logging.h:244
CRM_LOG_ASSERT
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:143
CRM_XS
#define CRM_XS
Definition: logging.h:34
crm_remote_s::tcp_socket
int tcp_socket
Definition: ipcs.h:43
crm_strdup_printf
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
crm_remote_accept
int crm_remote_accept(int ssock)
Definition: remote.c:1135
DIMOF
#define DIMOF(a)
Definition: crm.h:58
crm_debug
#define crm_debug(fmt, args...)
Definition: logging.h:246
gnutls_session_t
void gnutls_session_t
Definition: cib_remote.c:42
string2xml
xmlNode * string2xml(const char *input)
Definition: xml.c:2174
id
uint64_t id
Definition: remote.c:4
crm_remote_ready
int crm_remote_ready(crm_remote_t *remote, int total_timeout)
Definition: remote.c:612
__swab64
#define __swab64(x)
Definition: remote.c:70
crm_default_remote_port
int crm_default_remote_port()
Get the default remote connection TCP port on this host.
Definition: remote.c:1190
crm_set_nonblocking
int crm_set_nonblocking(int fd)
Definition: io.c:485
crm_parse_int
int crm_parse_int(const char *text, const char *default_text)
Parse an integer value from a string.
Definition: strings.c:114
crm_perror
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:219
crm_remote_s::buffer_size
size_t buffer_size
Definition: ipcs.h:40
host
AIS_Host host
Definition: internal.h:6
payload_offset
uint32_t payload_offset
Definition: remote.c:7
__swab32
#define __swab32(x)
Definition: remote.c:64
crm_remote_s::buffer
char * buffer
Definition: ipcs.h:39
REMOTE_MSG_VERSION
#define REMOTE_MSG_VERSION
Definition: remote.c:81
crm_remote_send
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:491
CRM_ASSERT
#define CRM_ASSERT(expr)
Definition: results.h:42
size_total
uint32_t size_total
Definition: remote.c:6
crm_client_s::remote
struct crm_remote_s * remote
Definition: ipcs.h:91
crm_client_s
Definition: ipcs.h:63
version
uint32_t version
Definition: remote.c:3
ENDIAN_LOCAL
#define ENDIAN_LOCAL
Definition: remote.c:82
crm_get_sbd_timeout
long crm_get_sbd_timeout(void)
Definition: watchdog.c:215
crm_internal.h
DEFAULT_REMOTE_PORT
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:50
crm.h
A dumping ground.
pcmk_ok
#define pcmk_ok
Definition: results.h:57
bz2_strerror
const char * bz2_strerror(int rc)
Definition: results.c:445