Commit 184fafc20c for asterisk.org
commit 184fafc20c4b361742161f9e93197517d9d626e1
Author: George Joseph <gjoseph@sangoma.com>
Date: Tue May 12 16:04:03 2026 -0600
WebSocket Enhancements: Proxies and Keepalives for ARI and Media Outbound Websockets.
See the notes below for high-level descriptions of the new features.
* Proxies
Outbound/forward HTTP proxies are now supported and configurable in
websocket_client.conf. You can specify a host:port plus optional proxy_username
and proxy_password. Because WebSockets aren't consistently supported among
proxies (specifically passing through UPGRADEs), the CONNECT method is always
used to establish a TCP tunnel through the proxy. This is required if a TLS
session is to be established with the WebSocket server anyway. It's important
to understand that that negotiation with the proxy is ALWAYS unsecured. Once
the proxy establishes the tunnel, the TLS session will be negotiated directly
with the remote WebSocket server via the tunnel.
* Keepalives
Both TCP-level and WebSocket PING/PONG keepalives can be configured and are
available with either the curl or tcptls client implementations. The TCP
keepalives are handled entirely by the operating system and require no
resources from Asterisk but by their very nature, they can't traverse proxies.
WebSocket PING/PONGs are implemented in the Asterisk websocket code and require
a scheduler thread to keep track of them so they're a bit more complicated but
they do traverse proxies. Which one is used is completely up to the admin.
You could use both.
* Other Changes
A few changes were needed to res/ari/ari_websockets and
res/res_aeap/transport_websocket to add explicit calls to ast_websocket_close.
They had been assuming that the websocket session destructor would close the
websocket when it unreffed it but the keepalive process now holds a reference
so the destructor wouldn't actually run without the call to ast_websocket_close
to stop the keepalives.
A few new methods were added to tcptls.c to allow switching an existing
connection from unsecured to TLS. These were required because the initial
connection and handshake with a proxy is always unsecured but then needs
to be switched to TLS if required for the remote WebSocket server.
There was a bug in sorcery.h where the ast_sorcery_register_uint macro
was referencing _stringify (which doesn't exist) instead of _sorcery_stringify.
Resolves: #1881
Resolves: #1933
UserNote: Forward/outbound proxies can now be specified for outbound websockets.
See the websocket_client.conf.sample file for configuration information.
UserNote: TCP-level or WebSocket PING/PONG keepalives can now be enabled on
outbound websockets. They can help detect network failures even when a
persistent connection is idle. See the websocket_client.conf.sample
file for configuration information.
DeveloperNote: The addition of the proxy and keepalive configuration parameters
pushed the websocket client parameter count over 32. This necessitated changing
the size of the ast_ws_client_fields enum from a 32 bit bitfield to a 64-bit
bitfield with a corresponding change to the ast_websocket_client structure.
diff --git a/configs/samples/websocket_client.conf.sample b/configs/samples/websocket_client.conf.sample
index 92d8f34641..706646becf 100644
--- a/configs/samples/websocket_client.conf.sample
+++ b/configs/samples/websocket_client.conf.sample
@@ -49,3 +49,111 @@
;verify_server_hostname = no ; Verify that the hostname in the server's certificate
; matches the hostname in the URI configured above.
; Default: yes
+;
+; Outbound/Forward HTTP Proxy
+;
+; If you need to use a proxy to reach the websocket server, you can configure it below.
+; Because WebSockets aren't consistently supported among proxies (specifically passing
+; through UPGRADEs), the CONNECT method is always used to establish a TCP tunnel
+; through the proxy. This is required if a TLS session is to be established with the
+; WebSocket server anyway. It's important to understand that that negotiation with the
+; proxy is ALWAYS unsecured. Once the proxy establishes the tunnel, the TLS session
+; will be negotiated directly with the remote WebSocket server via the tunnel.
+;
+;proxy_host = proxy:8080 ; The <hostname>:<port> for the forward proxy.
+ ; Default: none
+;proxy_username = username ; An authentication username for the proxy.
+ ; Default: none
+;proxy_password = password ; An authentication password for the proxy.
+ ; Default: none
+
+;
+; Detecting active connection failures.
+;
+; Generally speaking, if the websocket server process dies, the host operating
+; system will automatically close the TCP connection. Asterisk will detect
+; that immediately and attempt to re-establish the connection using the configured
+; connection_timeout, reconnect_interval and reconnect_attempts parameters.
+;
+; However if the host server itself dies or packets are being dropped in the
+; network, Asterisk won't be able to detect this until the next time it tries
+; to send data. To address this, there are two capabilities available:
+; TCP Keepalives and WebSocket PING/PONG.
+;
+; Which method you choose is entirely up to you and your environment. Nothing
+; prevents you from activating both but sending a PING frame will reset the
+; TCP keepalive idle timeout so, depending on the intervals configured, the
+; connection may never be idle long enough to trigger sending keepalives.
+;
+; * TCP Keepalives
+;
+; TCP keepalives are implemented by the Linux kernel and once configured, require no
+; resources from Asterisk. The same can be said for the server side as replying to
+; the keepalives is also handled entirely by the operating system without any
+; involvement from application servers. Once the configured number of replies are
+; missed, the kernel will close the socket which notifies Asterisk and causes an
+; attempt to reconnect using the configured connection_timeout, reconnect_interval
+; and reconnect_attempts parameters.
+;
+; The one downside to TCP keepalives is that they won't traverse a proxy so
+; you'll really only be testing if you can reach the proxy, not the websocket
+; server itself. However, if the proxy itself can be configured to do TCP
+; keepalives to the websocket server, you could still use TCP keepalives
+; between Asterisk and the proxy to create an effective end-to-end test.
+;
+;enable_tcp_keepalives = yes ; Enable TCP keepalives on outgoing WebSockets.
+ ; Default: no
+
+;tcp_keepalive_time = 20 ; Start sending keepalives when no data has been sent on
+ ; the connection for this many seconds.
+ ; Default: 20 seconds
+
+;tcp_keepalive_interval = 20 ; Send keepalives at this interval in seconds.
+ ; If a reply isn't received by the time the next keepalive
+ ; is due to be sent, it's considered missed so this option
+ ; also controls how long it takes to detect a failure.
+ ; See below.
+ ; Default: 20 seconds
+
+;tcp_keepalive_probes = 3 ; Close the connection after this many missed replies.
+ ; The time to detect a failure will be between
+ ; (probes * interval) and ((probes + 1) * interval) seconds.
+ ; If the connection closes and reconnect_interval and
+ ; reconnect_attempts are set, a new connection will be
+ ; attempted using those parameters.
+ ; Default: 3 probes
+;
+; * WebSocket PING/PONG frames
+;
+; WebSocket PING/PONG frames are another way to detect failures. When configured,
+; Asterisk will send a WebSoocket PING frame to the server at the configured interval.
+; All WebSocket server implementation are REQUIRED to respond with a PONG frame.
+; Once the configured number of PONG replies are missed, Asterisk will close the
+; connection and attempt to reconnect using the configured connection_timeout,
+; reconnect_interval and reconnect_attempts parameters. Unlike TCP heepalives
+; however, Asterisk must create a scheduler thread and manage the process so it's
+; not without overhead, however small.
+;
+; In contrast to TCP keepalives, WebSocket PING/PONGs will traverse a proxy.
+;
+;enable_pingpongs = yes ; Enable Websocket PING/PONGs
+ ; Default: no
+
+;pingpong_interval = 20 ; Send WebSocket PINGs at this interval in seconds.
+ ; If a reply isn't received by the time the next PING is due
+ ; to be sent, it's considered missed so this option also controls
+ ; how long it takes to detect a failure. See below.
+ ; Default: 20 seconds
+
+;pingpong_probes = 3 ; Close the connection after this many missed PONG replies.
+ ; The time to detect a failure is will be between
+ ; (probes * interval) and ((probes + 1) * interval)
+ ; seconds. If the connection closes and reconnect_interval and
+ ; reconnect_attempts are set, a new connection will be attempted using
+ ; those parameters.
+ ; Default: 3 probes
+;
+; Also be aware...many websocket server implementations can send PING frames
+; _to_ Asterisk which will automatically generate PONG replies. These won't
+; interfere with Asterisk sending its own PINGs.
+;
diff --git a/include/asterisk/http_websocket.h b/include/asterisk/http_websocket.h
index 42629f2ef6..8e542b8a9a 100644
--- a/include/asterisk/http_websocket.h
+++ b/include/asterisk/http_websocket.h
@@ -75,6 +75,7 @@ enum ast_websocket_opcode {
AST_WEBSOCKET_OPCODE_PONG = 0xA, /*!< Response to a ping */
AST_WEBSOCKET_OPCODE_CLOSE = 0x8, /*!< Connection is being closed */
AST_WEBSOCKET_OPCODE_CONTINUATION = 0x0, /*!< Continuation of a previous frame */
+ AST_WEBSOCKET_OPCODE_UNKNOWN = 0xf, /*!< Error */
};
/*! \brief Websocket Status Codes from RFC-6455 */
@@ -460,6 +461,7 @@ enum ast_websocket_result {
WS_WRITE_ERROR,
WS_CLIENT_START_ERROR,
WS_UNAUTHORIZED,
+ WS_TLS_ERROR,
};
/*!
@@ -480,9 +482,15 @@ enum ast_websocket_result {
* \param protocols a comma separated string of supported protocols
* \param tls_cfg secure websocket credentials
* \param result result code set on client failure
+ *
* \return a client websocket.
* \retval NULL if object could not be created or connected
* \since 13
+ *
+ * \warning The returned websocket must be closed with \ref ast_websocket_close
+ * and its reference count decremented with \ref ast_websocket_unref when
+ * it's no longer needed.
+ *
*/
AST_OPTIONAL_API(struct ast_websocket *, ast_websocket_client_create,
(const char *uri, const char *protocols,
@@ -517,9 +525,29 @@ struct ast_websocket_client_options {
* Secure websocket credentials
*/
struct ast_tls_config *tls_cfg;
- const char *username; /*!< Auth username */
- const char *password; /*!< Auth password */
+ const char *username; /*!< WebSocket server auth username */
+ const char *password; /*!< WebSocket server auth password */
+
int suppress_connection_msgs; /*!< Suppress connection log messages */
+ /*!
+ * Forward proxy
+ */
+ const char *proxy_host; /*!< Proxy server host:port */
+ const char *proxy_username; /*!< Proxy server auth username */
+ const char *proxy_password; /*!< Proxy server auth password */
+ /*!
+ * TCP Keepalives
+ */
+ int tcp_keepalives; /*!< Enable TCP keepalives */
+ unsigned int tcp_keepalive_time; /*!< Start sending when connection has been idle for this many seconds */
+ unsigned int tcp_keepalive_interval; /*!< Send keepalives at this interval in seconds */
+ unsigned int tcp_keepalive_probes; /*!< Close connection after this many missed responses */
+ /*!
+ * WebSocket PING/PONG
+ */
+ int pingpongs; /*!< Enable Websocket PING/PONGs */
+ unsigned int pingpong_interval; /*!< Send PING messages at this interval in seconds */
+ unsigned int pingpong_probes; /*!< Close connection after this many missed responses */
};
/*!
@@ -536,6 +564,10 @@ struct ast_websocket_client_options {
*
* \return a client websocket.
* \retval NULL if object could not be created or connected
+ *
+ * \warning The returned websocket must be closed with \ref ast_websocket_close
+ * and its reference count decremented with \ref ast_websocket_unref when
+ * it's no longer needed.
*/
AST_OPTIONAL_API(struct ast_websocket *, ast_websocket_client_create_with_options,
(struct ast_websocket_client_options *options,
diff --git a/include/asterisk/sorcery.h b/include/asterisk/sorcery.h
index 6e8b1c3e68..d2671e697d 100644
--- a/include/asterisk/sorcery.h
+++ b/include/asterisk/sorcery.h
@@ -1727,7 +1727,7 @@ extern int ast_sorcery_update_or_create_on_update_miss;
*/
#define ast_sorcery_register_uint(object, structure, option, field, def_value) \
ast_sorcery_object_field_register(sorcery, #object, #option, \
- _stringify(def_value), OPT_UINT_T, PARSE_IN_RANGE, \
+ _sorcery_stringify(def_value), OPT_UINT_T, PARSE_IN_RANGE, \
FLDSET(struct structure, field), 0, UINT_MAX)
/*!
diff --git a/include/asterisk/tcptls.h b/include/asterisk/tcptls.h
index dcf13e49f8..047ff1387d 100644
--- a/include/asterisk/tcptls.h
+++ b/include/asterisk/tcptls.h
@@ -218,6 +218,47 @@ void ast_tcptls_server_start(struct ast_tcptls_session_args *desc);
*/
void ast_tcptls_server_stop(struct ast_tcptls_session_args *desc);
+/*!
+ * \brief Set up an SSL client
+ *
+ * \since 20.21.0
+ * \since 22.11.0
+ * \since 23.5.0
+ *
+ * \note This function only needs to be called if an unsecured tcptls session is
+ * already established and you need to switch it to TLS. This function doesn't
+ * actually start any negotiation. It just reads any certificates, key files
+ * and options from the config and creates the SSL context.
+ *
+ * \param cfg Configuration for the SSL client
+ *
+ * \retval 1 Success
+ * \retval 0 Failure
+ */
+int ast_ssl_setup_client(struct ast_tls_config *cfg);
+
+/*!
+ * \brief Start TLS negotiation on an existing unsecured connection
+ *
+ * \since 20.21.0
+ * \since 22.11.0
+ * \since 23.5.0
+ *
+ * \note This function only needs to be called if an unsecured tcptls session is
+ * already established and you need to switch it to TLS. This function performs
+ * the handshake and validation. \ref ast_ssl_setup_client must be called first
+ * to create the SSL context.
+ *
+ * \param tcptls_session The existing unsecured session
+ *
+ * \warning If this function fails, it will automatically dereference tcptls_session
+ * and close the connection so don't attempt to dereference it again.
+ *
+ * \retval tcptls_session on Success
+ * \retval NULL Failure
+ */
+struct ast_tcptls_session_instance *ast_tcptls_start_tls(struct ast_tcptls_session_instance *tcptls_session);
+
/*!
* \brief Set up an SSL server
*
diff --git a/include/asterisk/websocket_client.h b/include/asterisk/websocket_client.h
index f62907407f..0b12e62ba6 100644
--- a/include/asterisk/websocket_client.h
+++ b/include/asterisk/websocket_client.h
@@ -22,36 +22,56 @@
#include "asterisk/http_websocket.h"
#include "asterisk/sorcery.h"
+/*
+ * Using 1ULL is important as it forces the enum to be 64 bits.
+ */
enum ast_ws_client_fields {
AST_WS_CLIENT_FIELD_NONE = 0,
- AST_WS_CLIENT_FIELD_URI = (1 << 0),
- AST_WS_CLIENT_FIELD_PROTOCOLS = (1 << 1),
- AST_WS_CLIENT_FIELD_USERNAME = (1 << 3),
- AST_WS_CLIENT_FIELD_PASSWORD = (1 << 4),
- AST_WS_CLIENT_FIELD_TLS_ENABLED = (1 << 7),
- AST_WS_CLIENT_FIELD_CA_LIST_FILE = (1 << 8),
- AST_WS_CLIENT_FIELD_CA_LIST_PATH = (1 << 9),
- AST_WS_CLIENT_FIELD_CERT_FILE = (1 << 10),
- AST_WS_CLIENT_FIELD_PRIV_KEY_FILE = (1 << 11),
- AST_WS_CLIENT_FIELD_CONNECTION_TYPE = (1 << 13),
- AST_WS_CLIENT_FIELD_RECONNECT_INTERVAL = (1 << 14),
- AST_WS_CLIENT_FIELD_RECONNECT_ATTEMPTS = (1 << 15),
- AST_WS_CLIENT_FIELD_CONNECTION_TIMEOUT = (1 << 16),
- AST_WS_CLIENT_FIELD_VERIFY_SERVER_CERT = (1 << 17),
- AST_WS_CLIENT_FIELD_VERIFY_SERVER_HOSTNAME = (1 << 18),
+ AST_WS_CLIENT_FIELD_URI = (1ULL << 0),
+ AST_WS_CLIENT_FIELD_PROTOCOLS = (1ULL << 1),
+ AST_WS_CLIENT_FIELD_USERNAME = (1ULL << 3),
+ AST_WS_CLIENT_FIELD_PASSWORD = (1ULL << 4),
+ AST_WS_CLIENT_FIELD_TLS_ENABLED = (1ULL << 7),
+ AST_WS_CLIENT_FIELD_CA_LIST_FILE = (1ULL << 8),
+ AST_WS_CLIENT_FIELD_CA_LIST_PATH = (1ULL << 9),
+ AST_WS_CLIENT_FIELD_CERT_FILE = (1ULL << 10),
+ AST_WS_CLIENT_FIELD_PRIV_KEY_FILE = (1ULL << 11),
+ AST_WS_CLIENT_FIELD_CONNECTION_TYPE = (1ULL << 13),
+ AST_WS_CLIENT_FIELD_RECONNECT_INTERVAL = (1ULL << 14),
+ AST_WS_CLIENT_FIELD_RECONNECT_ATTEMPTS = (1ULL << 15),
+ AST_WS_CLIENT_FIELD_CONNECTION_TIMEOUT = (1ULL << 16),
+ AST_WS_CLIENT_FIELD_VERIFY_SERVER_CERT = (1ULL << 17),
+ AST_WS_CLIENT_FIELD_VERIFY_SERVER_HOSTNAME = (1ULL << 18),
+ AST_WS_CLIENT_FIELD_PROXY_HOST = (1ULL << 19),
+ AST_WS_CLIENT_FIELD_PROXY_USERNAME = (1ULL << 20),
+ AST_WS_CLIENT_FIELD_PROXY_PASSWORD = (1ULL << 21),
+ AST_WS_CLIENT_FIELD_TCP_KEEPALIVES = (1ULL << 22),
+ AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_TIME = (1ULL << 23),
+ AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_INTERVAL = (1ULL << 24),
+ AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_PROBES = (1ULL << 25),
+ AST_WS_CLIENT_FIELD_PINGPONGS = (1ULL << 26),
+ AST_WS_CLIENT_FIELD_PINGPONG_INTERVAL = (1ULL << 27),
+ AST_WS_CLIENT_FIELD_PINGPONG_PROBES = (1ULL << 28),
AST_WS_CLIENT_NEEDS_RECONNECT = AST_WS_CLIENT_FIELD_URI | AST_WS_CLIENT_FIELD_PROTOCOLS
| AST_WS_CLIENT_FIELD_CONNECTION_TYPE
| AST_WS_CLIENT_FIELD_USERNAME | AST_WS_CLIENT_FIELD_PASSWORD
| AST_WS_CLIENT_FIELD_TLS_ENABLED | AST_WS_CLIENT_FIELD_CA_LIST_FILE
| AST_WS_CLIENT_FIELD_CA_LIST_PATH | AST_WS_CLIENT_FIELD_CERT_FILE
| AST_WS_CLIENT_FIELD_PRIV_KEY_FILE | AST_WS_CLIENT_FIELD_VERIFY_SERVER_CERT
- | AST_WS_CLIENT_FIELD_VERIFY_SERVER_HOSTNAME,
+ | AST_WS_CLIENT_FIELD_VERIFY_SERVER_HOSTNAME
+ | AST_WS_CLIENT_FIELD_PROXY_HOST | AST_WS_CLIENT_FIELD_PROXY_USERNAME
+ | AST_WS_CLIENT_FIELD_PROXY_PASSWORD
+ | AST_WS_CLIENT_FIELD_TCP_KEEPALIVES
+ | AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_TIME | AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_INTERVAL
+ | AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_PROBES
+ | AST_WS_CLIENT_FIELD_PINGPONGS | AST_WS_CLIENT_FIELD_PINGPONG_INTERVAL
+ | AST_WS_CLIENT_FIELD_PINGPONG_PROBES,
};
/*
- * The first 23 fields are reserved for the websocket client core.
+ * The first 29 fields are reserved for the websocket client core.
*/
-#define AST_WS_CLIENT_FIELD_USER_START 24
+#define AST_WS_CLIENT_FIELD_USER_START 30
struct ast_websocket_client {
SORCERY_OBJECT(details);
@@ -75,6 +95,16 @@ struct ast_websocket_client {
int verify_server_cert; /*!< Verify server certificate */
int verify_server_hostname; /*!< Verify server hostname */
AST_STRING_FIELD_EXTENDED(uri_params); /*!< Additional URI parameters */
+ AST_STRING_FIELD_EXTENDED(proxy_host); /*!< Proxy server URI */
+ AST_STRING_FIELD_EXTENDED(proxy_username); /*!< Proxy username */
+ AST_STRING_FIELD_EXTENDED(proxy_password); /*!< Proxy password */
+ int tcp_keepalives; /*!< Enable TCP Keepalives */
+ unsigned int tcp_keepalive_time; /*!< Start sending when connection has been idle for this many seconds */
+ unsigned int tcp_keepalive_interval; /*!< Send keepalives at this interval in seconds */
+ unsigned int tcp_keepalive_probes; /*!< Close connection after this many missed responses */
+ int pingpongs; /*!< Enable WebSocket PING/PONGs */
+ unsigned int pingpong_interval; /*!< Send WebSocket PINGs at this interval in seconds */
+ unsigned int pingpong_probes; /*!< Close connection after this many missed PONG responses */
};
/*!
@@ -134,6 +164,10 @@ void ast_websocket_client_observer_remove(
* result of the connection attempt.
*
* \return A pointer to the ast_websocket structure on success, or NULL on failure.
+ *
+ * \warning The returned websocket must be closed with \ref ast_websocket_close
+ * and its reference count decremented with \ref ast_websocket_unref when
+ * it's no longer needed.
*/
struct ast_websocket *ast_websocket_client_connect(struct ast_websocket_client *wc,
void *lock_obj, const char *display_name, enum ast_websocket_result *result);
diff --git a/main/tcptls.c b/main/tcptls.c
index 99546e5834..fcbbe9063e 100644
--- a/main/tcptls.c
+++ b/main/tcptls.c
@@ -130,48 +130,13 @@ static void write_openssl_error_to_log(void)
}
#endif
-/*! \brief
-* creates a FILE * from the fd passed by the accept thread.
-* This operation is potentially expensive (certificate verification),
-* so we do it in the child thread context.
-*
-* \note must decrement ref count before returning NULL on error
-*/
-static void *handle_tcptls_connection(void *data)
+struct ast_tcptls_session_instance *ast_tcptls_start_tls(struct ast_tcptls_session_instance *tcptls_session)
{
- struct ast_tcptls_session_instance *tcptls_session = data;
#ifdef DO_SSL
SSL *ssl;
#endif
- /* TCP/TLS connections are associated with external protocols, and
- * should not be allowed to execute 'dangerous' functions. This may
- * need to be pushed down into the individual protocol handlers, but
- * this seems like a good general policy.
- */
- if (ast_thread_inhibit_escalations()) {
- ast_log(LOG_ERROR, "Failed to inhibit privilege escalations; killing connection from peer '%s'\n",
- ast_sockaddr_stringify(&tcptls_session->remote_address));
- ast_tcptls_close_session_file(tcptls_session);
- ao2_ref(tcptls_session, -1);
- return NULL;
- }
-
- /*
- * TCP/TLS connections are associated with external protocols which can
- * be considered to be user interfaces (even for SIP messages), and
- * will not handle channel media. This may need to be pushed down into
- * the individual protocol handlers, but this seems like a good start.
- */
- if (ast_thread_user_interface_set(1)) {
- ast_log(LOG_ERROR, "Failed to set user interface status; killing connection from peer '%s'\n",
- ast_sockaddr_stringify(&tcptls_session->remote_address));
- ast_tcptls_close_session_file(tcptls_session);
- ao2_ref(tcptls_session, -1);
- return NULL;
- }
-
- if (tcptls_session->parent->tls_cfg) {
+ if (tcptls_session->parent->tls_cfg && tcptls_session->parent->tls_cfg->enabled) {
#ifdef DO_SSL
if (ast_iostream_start_tls(&tcptls_session->stream, tcptls_session->parent->tls_cfg->ssl_ctx, tcptls_session->client) < 0) {
SSL *ssl = ast_iostream_get_ssl(tcptls_session->stream);
@@ -270,6 +235,51 @@ static void *handle_tcptls_connection(void *data)
#endif /* DO_SSL */
}
+ return tcptls_session;
+}
+
+/*! \brief
+* creates a FILE * from the fd passed by the accept thread.
+* This operation is potentially expensive (certificate verification),
+* so we do it in the child thread context.
+*
+* \note must decrement ref count before returning NULL on error
+*/
+static void *handle_tcptls_connection(void *data)
+{
+ struct ast_tcptls_session_instance *tcptls_session = data;
+
+ /* TCP/TLS connections are associated with external protocols, and
+ * should not be allowed to execute 'dangerous' functions. This may
+ * need to be pushed down into the individual protocol handlers, but
+ * this seems like a good general policy.
+ */
+ if (ast_thread_inhibit_escalations()) {
+ ast_log(LOG_ERROR, "Failed to inhibit privilege escalations; killing connection from peer '%s'\n",
+ ast_sockaddr_stringify(&tcptls_session->remote_address));
+ ast_tcptls_close_session_file(tcptls_session);
+ ao2_ref(tcptls_session, -1);
+ return NULL;
+ }
+
+ /*
+ * TCP/TLS connections are associated with external protocols which can
+ * be considered to be user interfaces (even for SIP messages), and
+ * will not handle channel media. This may need to be pushed down into
+ * the individual protocol handlers, but this seems like a good start.
+ */
+ if (ast_thread_user_interface_set(1)) {
+ ast_log(LOG_ERROR, "Failed to set user interface status; killing connection from peer '%s'\n",
+ ast_sockaddr_stringify(&tcptls_session->remote_address));
+ ast_tcptls_close_session_file(tcptls_session);
+ ao2_ref(tcptls_session, -1);
+ return NULL;
+ }
+
+ if (ast_tcptls_start_tls(tcptls_session) == NULL) {
+ return NULL;
+ }
+
if (tcptls_session->parent->worker_fn) {
return tcptls_session->parent->worker_fn(tcptls_session);
} else {
@@ -579,6 +589,11 @@ int ast_ssl_setup(struct ast_tls_config *cfg)
return __ssl_setup(cfg, 0, 0);
}
+int ast_ssl_setup_client(struct ast_tls_config *cfg)
+{
+ return __ssl_setup(cfg, 1, 1);
+}
+
void ast_ssl_teardown(struct ast_tls_config *cfg)
{
#ifdef DO_SSL
diff --git a/res/ari/ari_websockets.c b/res/ari/ari_websockets.c
index b633567579..945c43a324 100644
--- a/res/ari/ari_websockets.c
+++ b/res/ari/ari_websockets.c
@@ -733,7 +733,7 @@ static int session_update(struct ari_ws_session *ari_ws_session,
general->write_timeout);
}
- ao2_ref(ast_ws_session, +1);
+ ast_websocket_ref(ast_ws_session);
ari_ws_session->ast_ws_session = ast_ws_session;
ao2_lock(ari_ws_session);
for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->message_queue); i++) {
@@ -853,6 +853,7 @@ static void websocket_established_cb(struct ast_websocket *ast_ws_session,
upgrade_headers, ari_ws_session->app_name, msg);
ast_json_unref(msg);
}
+ ast_websocket_close(ast_ws_session, AST_WEBSOCKET_STATUS_GOING_AWAY);
ari_ws_session->connected = 0;
SCOPE_EXIT("%s: Websocket closed\n", remote_addr);
@@ -1003,6 +1004,8 @@ static void *outbound_session_handler_thread(void *obj)
* We only want to send "ApplicationRegistered" events in the
* case of a reconnect. The initial connection will have already sent
* the events when outbound_register_apps() was called.
+ *
+ * Note: session_update() bumps astws.
*/
session_update(session, astws, !already_sent_registers);
already_sent_registers = 0;
@@ -1022,6 +1025,8 @@ static void *outbound_session_handler_thread(void *obj)
session->thread = 0;
session->connected = 0;
ast_websocket_close(astws, 1000);
+ /* Clean up the reference held by session_update() */
+ ast_websocket_unref(astws);
session->ast_ws_session = NULL;
break;
}
@@ -1042,6 +1047,8 @@ static void *outbound_session_handler_thread(void *obj)
}
session->connected = 0;
+ ast_websocket_close(session->ast_ws_session, AST_WEBSOCKET_STATUS_GOING_AWAY);
+ /* Clean up the reference held by session_update() */
ast_websocket_unref(session->ast_ws_session);
session->ast_ws_session = NULL;
if (session->closing) {
diff --git a/res/ari/internal.h b/res/ari/internal.h
index abc3381cd0..1aed1e1349 100644
--- a/res/ari/internal.h
+++ b/res/ari/internal.h
@@ -96,13 +96,17 @@ struct ari_conf_user {
struct ast_acl_list *acl;
};
+/*
+ * Using 1ULL is important as it forces the enum to be 64 bits to match
+ * the size of enum ast_ws_client_fields.
+ */
enum ari_conf_owc_fields {
ARI_OWC_FIELD_NONE = 0,
- ARI_OWC_FIELD_WEBSOCKET_CONNECTION_ID = (1 << AST_WS_CLIENT_FIELD_USER_START),
- ARI_OWC_FIELD_APPS = (1 << (AST_WS_CLIENT_FIELD_USER_START + 1)),
- ARI_OWC_FIELD_LOCAL_ARI_USER = (1 << (AST_WS_CLIENT_FIELD_USER_START + 2)),
- ARI_OWC_FIELD_LOCAL_ARI_PASSWORD = (1 << (AST_WS_CLIENT_FIELD_USER_START + 3)),
- ARI_OWC_FIELD_SUBSCRIBE_ALL = (1 << (AST_WS_CLIENT_FIELD_USER_START + 4)),
+ ARI_OWC_FIELD_WEBSOCKET_CONNECTION_ID = (1ULL << AST_WS_CLIENT_FIELD_USER_START),
+ ARI_OWC_FIELD_APPS = (1ULL << (AST_WS_CLIENT_FIELD_USER_START + 1)),
+ ARI_OWC_FIELD_LOCAL_ARI_USER = (1ULL << (AST_WS_CLIENT_FIELD_USER_START + 2)),
+ ARI_OWC_FIELD_LOCAL_ARI_PASSWORD = (1ULL << (AST_WS_CLIENT_FIELD_USER_START + 3)),
+ ARI_OWC_FIELD_SUBSCRIBE_ALL = (1ULL << (AST_WS_CLIENT_FIELD_USER_START + 4)),
ARI_OWC_NEEDS_RECONNECT = AST_WS_CLIENT_NEEDS_RECONNECT
| ARI_OWC_FIELD_WEBSOCKET_CONNECTION_ID | ARI_OWC_FIELD_LOCAL_ARI_USER
| ARI_OWC_FIELD_LOCAL_ARI_PASSWORD,
diff --git a/res/res_aeap/transport_websocket.c b/res/res_aeap/transport_websocket.c
index 801e4d82a0..86fdf8db28 100644
--- a/res/res_aeap/transport_websocket.c
+++ b/res/res_aeap/transport_websocket.c
@@ -60,8 +60,7 @@ static int websocket_disconnect(struct aeap_transport *self)
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
if (transport->ws) {
- ast_websocket_unref(transport->ws);
- transport->ws = NULL;
+ ast_websocket_close(transport->ws, AST_WEBSOCKET_STATUS_NORMAL);
}
return 0;
@@ -69,11 +68,12 @@ static int websocket_disconnect(struct aeap_transport *self)
static void websocket_destroy(struct aeap_transport *self)
{
- /*
- * Disconnect takes care of cleaning up the websocket. Note, disconnect
- * was called by the base/dispatch interface prior to calling this
- * function so nothing to do here.
- */
+ struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
+
+ if (transport->ws) {
+ ast_websocket_unref(transport->ws);
+ transport->ws = NULL;
+ }
}
static intmax_t websocket_read(struct aeap_transport *self, void *buf, intmax_t size,
diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c
index 2854f6be2e..d062a226a4 100644
--- a/res/res_http_websocket.c
+++ b/res/res_http_websocket.c
@@ -34,6 +34,7 @@
#include "asterisk/astobj2.h"
#include "asterisk/strings.h"
#include "asterisk/file.h"
+#include "asterisk/sched.h"
#include "asterisk/unaligned.h"
#include "asterisk/uri.h"
#include "asterisk/uuid.h"
@@ -78,6 +79,53 @@
#define MAX_WS_HDR_SZ 14
#define MIN_WS_HDR_SZ 2
+/*! \brief WS_PING_PAYLOAD
+ * It's possible that a user of this API could be sending their own PINGs
+ * and expecting to see PONGs so we use the PING_PAYLOAD in the PINGs we
+ * send so we can detect that any PONGs we receive are from our own PINGs.
+ */
+#define WS_PING_PAYLOAD "WS_CLIENT_PING"
+#define WS_PING_PAYLOAD_LEN 14
+
+static struct ast_sched_context *ping_scheduler;
+
+enum ws_closed_by {
+ WS_NOT_CLOSED = 0,
+ WS_CLOSED_BY_REMOTE,
+ WS_CLOSED_BY_US,
+};
+
+struct websocket_client {
+ /*! Options used to create the client */
+ struct ast_websocket_client_options *options;
+ /*! host portion of client uri */
+ char *host;
+ /*! path for logical websocket connection */
+ struct ast_str *resource_name;
+ /*! unique key used during server handshaking */
+ char *key;
+ /*! container for registered protocols */
+ char *protocols;
+ /*! the protocol accepted by the server */
+ char *accept_protocol;
+ /*! websocket protocol version */
+ int version;
+ /*! tcptls connection arguments */
+ struct ast_tcptls_session_args *args;
+ /*! tcptls connection instance */
+ struct ast_tcptls_session_instance *ser;
+ /*! Authentication userid:password */
+ char *userinfo;
+ /*! Suppress connection log messages */
+ int suppress_connection_msgs;
+ /*! Proxy-Authentication userid:password */
+ char *proxy_userinfo;
+ /*! The ping scheduler timer id */
+ int ping_sched_timer;
+ /*! How many missed pong responses currently */
+ int missed_pong_count;
+};
+
/*! \brief Structure definition for session */
struct ast_websocket {
struct ast_iostream *stream; /*!< iostream of the connection */
@@ -91,12 +139,31 @@ struct ast_websocket {
unsigned int secure:1; /*!< Bit to indicate that the transport is secure */
unsigned int closing:1; /*!< Bit to indicate that the session is in the process of being closed */
unsigned int close_sent:1; /*!< Bit to indicate that the session close opcode has been sent and no further data will be sent */
+ unsigned int non_blocking:1; /*!< Bit to indicate that the socket is non-blocking */
struct websocket_client *client; /*!< Client object when connected as a client websocket */
char session_id[AST_UUID_STR_LEN]; /*!< The identifier for the websocket session */
uint16_t close_status_code; /*!< Status code sent in a CLOSE frame upon shutdown */
+ enum ws_closed_by closed_by; /*!< Who's closing the websocket? */
char buf[AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE]; /*!< Fixed buffer for reading data into */
};
+#define WS_SESSION_REMOTE(_session) (_session ? (_session->client ? _session->client->options->uri : ast_sockaddr_stringify(&_session->remote_address)) : "NULL")
+#define ARE_PINGPONGS_ENABLED(_session) (_session && _session->client && _session->client->options->pingpongs && _session->client->ping_sched_timer >= 0)
+
+static const char *closed_by_str[] = {
+ [WS_NOT_CLOSED] = "not closed",
+ [WS_CLOSED_BY_REMOTE] = "remote",
+ [WS_CLOSED_BY_US] = "local"
+};
+
+static const char *closed_by_to_str(enum ws_closed_by closed_by)
+{
+ if (!ARRAY_IN_BOUNDS(closed_by, closed_by_str)) {
+ return "unknown";
+ }
+ return closed_by_str[closed_by];
+}
+
const char *ast_websocket_type_to_str(enum ast_websocket_type type)
{
switch (type) {
@@ -189,19 +256,21 @@ struct ast_websocket_server *AST_OPTIONAL_API_NAME(ast_websocket_server_create)(
static void session_destroy_fn(void *obj)
{
struct ast_websocket *session = obj;
+ char *id = ast_strdupa(WS_SESSION_REMOTE(session));
+ SCOPE_ENTER(2, "%s: Session %p destructor\n", id, obj);
if (session->stream) {
ast_websocket_close(session, session->close_status_code);
if (session->stream) {
ast_iostream_close(session->stream);
session->stream = NULL;
- ast_debug(3, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from",
- ast_sockaddr_stringify(&session->remote_address));
+ ast_trace(-1, "%s: WebSocket connection closed\n", WS_SESSION_REMOTE(session));
}
}
ao2_cleanup(session->client);
ast_free(session->payload);
+ SCOPE_EXIT_RTN("%s; Session %p destructor complete\n", id, obj);
}
struct ast_websocket_protocol *AST_OPTIONAL_API_NAME(ast_websocket_sub_protocol_alloc)(const char *name)
@@ -319,83 +388,187 @@ static void websocket_mask_payload(struct ast_websocket *session, char *frame, c
}
}
+static const char *opcode_map[] = {
+ [AST_WEBSOCKET_OPCODE_CONTINUATION] = "continuation",
+ [AST_WEBSOCKET_OPCODE_TEXT] = "text",
+ [AST_WEBSOCKET_OPCODE_BINARY] = "binary",
+ [AST_WEBSOCKET_OPCODE_CLOSE] = "close",
+ [AST_WEBSOCKET_OPCODE_PING] = "ping",
+ [AST_WEBSOCKET_OPCODE_PONG] = "pong",
+};
-/*! \brief Close function for websocket session */
-int AST_OPTIONAL_API_NAME(ast_websocket_close)(struct ast_websocket *session, uint16_t reason)
+static const char *websocket_opcode2str(enum ast_websocket_opcode opcode)
+{
+ if (!ARRAY_IN_BOUNDS(opcode, opcode_map)) {
+ return "<unknown>";
+ }
+ return opcode_map[opcode];
+}
+
+static void ping_scheduler_cancel(struct ast_websocket *session)
+{
+ int enabled = ARE_PINGPONGS_ENABLED(session);
+ SCOPE_ENTER(2, "%s: Cancelling PING/PONG keepalives\n", WS_SESSION_REMOTE(session));
+
+ if (!enabled) {
+ SCOPE_EXIT_RTN("%s: Not enabled, cancel not needed\n", WS_SESSION_REMOTE(session));
+ }
+ AST_SCHED_DEL(ping_scheduler, session->client->ping_sched_timer);
+ ao2_ref(session, -1);
+ SCOPE_EXIT_RTN("%s: Cancelled PING/PONG keepalives\n", WS_SESSION_REMOTE(session));
+}
+
+static int websocket_close(struct ast_websocket *session, uint16_t reason, int force)
{
enum ast_websocket_opcode opcode = AST_WEBSOCKET_OPCODE_CLOSE;
/* The header is either 2 or 6 bytes and the
* reason code takes up another 2 bytes */
char frame[8] = { 0, };
- int header_size, fsize, res;
+ int header_size, fsize, res = 0;
+ int fd = session->stream ? ast_iostream_get_fd(session->stream) : -1;
+ SCOPE_ENTER(2, "%s: Close requested. Reason: %s (%d) Closed by: %s Force: %s\n", WS_SESSION_REMOTE(session),
+ ast_websocket_status_to_str(reason), reason, closed_by_to_str(session->closed_by), AST_YESNO(force));
- if (session->close_sent) {
- return 0;
+ ping_scheduler_cancel(session);
+
+ ao2_lock(session);
+ if (session->closing) {
+ ao2_unlock(session);
+ SCOPE_EXIT_RTN_VALUE(0, "%s: Close already sent\n", WS_SESSION_REMOTE(session));
+ }
+
+ session->closing = 1;
+
+ if (!session->stream) {
+ ao2_unlock(session);
+ SCOPE_EXIT_RTN_VALUE(-1, "%s: WebSocket stream already closed\n", WS_SESSION_REMOTE(session));
+ }
+
+ if (force) {
+ ast_trace(-1, "%s: Forcing close. Handle: %p FD: %d\n", WS_SESSION_REMOTE(session),
+ session->stream, fd);
+ ast_iostream_close(session->stream);
+ session->stream = NULL;
+ ao2_unlock(session);
+ SCOPE_EXIT_RTN_VALUE(-1, "%s: Forced close\n", WS_SESSION_REMOTE(session));
}
/* clients need space for an additional 4 byte masking key */
header_size = session->client ? 6 : 2;
fsize = header_size + 2;
+ session->close_sent = 1;
frame[0] = opcode | 0x80;
frame[1] = 2; /* The reason code is always 2 bytes */
+ /*
+ * If the remote initiated the close we should respond with the same
+ * reason code they sent.
+ */
+ if (session->closed_by == WS_CLOSED_BY_REMOTE) {
+ reason = session->close_status_code;
+ }
+
/* If no reason has been specified assume 1000 which is normal closure */
put_unaligned_uint16(&frame[header_size], htons(reason ? reason : 1000));
websocket_mask_payload(session, frame, &frame[header_size], 2);
+ ast_trace(-1, "%s: Writing %sCLOSE frame with reason %s (%d). fd: %d\n", WS_SESSION_REMOTE(session),
+ session->closed_by == WS_CLOSED_BY_REMOTE ? "reply " : "", ast_websocket_status_to_str(reason), reason, fd);
- session->closing = 1;
- session->close_sent = 1;
-
- ao2_lock(session);
ast_iostream_set_timeout_inactivity(session->stream, session->timeout);
res = ast_iostream_write(session->stream, frame, fsize);
ast_iostream_set_timeout_disable(session->stream);
- /* If an error occurred when trying to close this connection explicitly terminate it now.
- * Doing so will cause the thread polling on it to wake up and terminate.
+ /*
+ * If the remote initiated the close or we failed to send,
+ * we can close the socket. If we just sent a CLOSE of our own, we need to wait
+ * until we get the close reply from the remote.
*/
- if (res != fsize) {
+ if (session->closed_by == WS_CLOSED_BY_REMOTE || res != fsize) {
+ ast_trace(-1, "%s: %s Closing socket. fd: %d\n",
+ session->closed_by == WS_CLOSED_BY_REMOTE ? "Wrote CLOSE reply." : "Writing CLOSE failed.",
+ WS_SESSION_REMOTE(session), fd);
+
ast_iostream_close(session->stream);
session->stream = NULL;
- ast_verb(2, "WebSocket connection %s '%s' forcefully closed due to fatal write error\n",
- session->client ? "to" : "from", ast_sockaddr_stringify(&session->remote_address));
+ ao2_unlock(session);
+ SCOPE_EXIT_RTN_VALUE(0, "%s: Socket closed after %s\n", WS_SESSION_REMOTE(session),
+ session->closed_by == WS_CLOSED_BY_REMOTE ? "reply" : "write failure");
}
ao2_unlock(session);
- return res == sizeof(frame);
+ SCOPE_EXIT_RTN_VALUE(res == sizeof(frame), "%s: Close done\n", WS_SESSION_REMOTE(session));
}
-static const char *opcode_map[] = {
- [AST_WEBSOCKET_OPCODE_CONTINUATION] = "continuation",
- [AST_WEBSOCKET_OPCODE_TEXT] = "text",
- [AST_WEBSOCKET_OPCODE_BINARY] = "binary",
- [AST_WEBSOCKET_OPCODE_CLOSE] = "close",
- [AST_WEBSOCKET_OPCODE_PING] = "ping",
- [AST_WEBSOCKET_OPCODE_PONG] = "pong",
-};
+static int websocket_handled_pong_or_close(struct ast_websocket *session, char *payload, uint64_t payload_len,
+ enum ast_websocket_opcode opcode)
+{
+ SCOPE_ENTER(4, "%s: Opcode: %s\n", WS_SESSION_REMOTE(session), websocket_opcode2str(opcode));
-static const char *websocket_opcode2str(enum ast_websocket_opcode opcode)
+ if (opcode == AST_WEBSOCKET_OPCODE_PONG) {
+ /*
+ * If it's from our own PING, reset the missed count.
+ */
+ if (session->client->missed_pong_count
+ && payload_len == WS_PING_PAYLOAD_LEN
+ && strncmp(payload, WS_PING_PAYLOAD, payload_len) == 0) {
+ int mpc = session->client->missed_pong_count;
+
+ session->client->missed_pong_count = 0;
+ SCOPE_EXIT_RTN_VALUE(1, "%s: Received PONG from our own PING. Missed count was: %d. Cleared.\n",
+ WS_SESSION_REMOTE(session), mpc);
+ } else {
+ SCOPE_EXIT_RTN_VALUE(0, "%s: Received PONG. Passing up to client.\n", WS_SESSION_REMOTE(session));
+
+ }
+ }
+
+ if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
+ if (payload_len >= 2) {
+ session->close_status_code = ntohs(get_unaligned_uint16(payload));
+ }
+ if (session->closed_by == WS_NOT_CLOSED) {
+ session->closed_by = WS_CLOSED_BY_REMOTE;
+ SCOPE_EXIT_RTN_VALUE(1, "%s: Handled CLOSE request by remote with reason %s (%d)\n", WS_SESSION_REMOTE(session),
+ ast_websocket_status_to_str(session->close_status_code), session->close_status_code);
+ }
+
+ ast_trace(-1, "%s: Received CLOSE response from remote with reason: %s (%d)\n",
+ WS_SESSION_REMOTE(session), ast_websocket_status_to_str(session->close_status_code),
+ session->close_status_code);
+ /*
+ * We got the close response so we can now clean up the socket.
+ */
+ websocket_close(session, session->close_status_code, 1);
+
+
+ SCOPE_EXIT_RTN_VALUE(1, "%s: Handled CLOSE\n", WS_SESSION_REMOTE(session));
+ }
+
+ SCOPE_EXIT_RTN_VALUE(0, "%s: Unhandled %s opcode\n", WS_SESSION_REMOTE(session), websocket_opcode2str(opcode));
+}
+
+/*! \brief Close function for websocket session */
+int AST_OPTIONAL_API_NAME(ast_websocket_close)(struct ast_websocket *session, uint16_t reason)
{
- if (opcode < AST_WEBSOCKET_OPCODE_CONTINUATION ||
- opcode > AST_WEBSOCKET_OPCODE_PONG) {
- return "<unknown>";
- } else {
- return opcode_map[opcode];
+ if (session->closed_by == WS_NOT_CLOSED) {
+ session->closed_by = WS_CLOSED_BY_US;
}
+
+ return websocket_close(session, reason, 0);
}
/*! \brief Write function for websocket traffic */
-int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
+int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, enum ast_websocket_opcode opcode,
+ char *payload, uint64_t payload_size)
{
size_t header_size = 2; /* The minimum size of a websocket frame is 2 bytes */
char *frame;
uint64_t length;
uint64_t frame_size;
-
- ast_debug(3, "Writing websocket %s frame, length %" PRIu64 "\n",
- websocket_opcode2str(opcode), payload_size);
+ SCOPE_ENTER(4, "%s: Opcode: %s Length: %"PRIu64"\n",
+ WS_SESSION_REMOTE(session), websocket_opcode2str(opcode), payload_size);
if (payload_size < 126) {
length = payload_size;
@@ -436,22 +609,25 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en
ao2_lock(session);
if (session->closing) {
ao2_unlock(session);
- return -1;
+ SCOPE_EXIT_RTN_VALUE(-1, "%s: Websocket already closing\n", WS_SESSION_REMOTE(session));
}
ast_iostream_set_timeout_sequence(session->stream, ast_tvnow(), session->timeout);
if (ast_iostream_write(session->stream, frame, frame_size) != frame_size) {
ao2_unlock(session);
/* 1011 - server terminating connection due to not being able to fulfill the request */
- ast_debug(1, "Closing WS with 1011 because we can't fulfill a write request\n");
- ast_websocket_close(session, 1011);
- return -1;
+ ast_trace(-1, "%s: Closing WS with 1011 because we can't fulfill a write request\n",
+ WS_SESSION_REMOTE(session));
+ websocket_close(session, 1011, 1);
+ SCOPE_EXIT_RTN_VALUE(-1, "%s: Closed WS with 1011 because we couldn't fulfill a write request\n",
+ WS_SESSION_REMOTE(session));
}
ast_iostream_set_timeout_disable(session->stream);
ao2_unlock(session);
- return 0;
+ SCOPE_EXIT_RTN_VALUE(0, "%s: Wrote opcode: %s length: %"PRIu64"\n",
+ WS_SESSION_REMOTE(session), websocket_opcode2str(opcode), payload_size);
}
void AST_OPTIONAL_API_NAME(ast_websocket_reconstruct_enable)(struct ast_websocket *session, size_t bytes)
@@ -466,12 +642,21 @@ void AST_OPTIONAL_API_NAME(ast_websocket_reconstruct_disable)(struct ast_websock
void AST_OPTIONAL_API_NAME(ast_websocket_ref)(struct ast_websocket *session)
{
+ char *id = ast_strdupa(WS_SESSION_REMOTE(session));
+ int refcount = session ? ao2_ref(session, 0) : 0;
+ SCOPE_ENTER(2, "%s: Reffing. Refcount: %d\n", id, refcount);
ao2_ref(session, +1);
+ SCOPE_EXIT("%s: Reffed. Refcount: %d\n", id, session ? refcount - 1 : 0);
}
void AST_OPTIONAL_API_NAME(ast_websocket_unref)(struct ast_websocket *session)
{
+ char *id = ast_strdupa(WS_SESSION_REMOTE(session));
+ int refcount = session ? ao2_ref(session, 0) : 0;
+ SCOPE_ENTER(2, "%s: Unreffing. Refcount: %d\n", id, refcount);
+
ao2_cleanup(session);
+ SCOPE_EXIT("%s: Unreffed. Refcount: %d\n", id, session ? refcount - 1 : 0);
}
int AST_OPTIONAL_API_NAME(ast_websocket_fd)(struct ast_websocket *session)
@@ -621,13 +806,14 @@ int AST_OPTIONAL_API_NAME(ast_websocket_read)(struct ast_websocket *session, cha
int mask_present = 0;
char *mask = NULL, *new_payload = NULL;
size_t options_len = 0, frame_size = 0;
+ SCOPE_ENTER(4, "%s: Reading\n", WS_SESSION_REMOTE(session));
*payload = NULL;
*payload_len = 0;
*fragmented = 0;
if (ws_safe_read(session, &session->buf[0], MIN_WS_HDR_SZ, opcode)) {
- return -1;
+ SCOPE_EXIT_RTN_VALUE(-1, "%s: Initial ws_safe_read failed\n", WS_SESSION_REMOTE(session));
}
frame_size += MIN_WS_HDR_SZ;
@@ -645,7 +831,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_read)(struct ast_websocket *session, cha
if (options_len) {
/* read the rest of the header options */
if (ws_safe_read(session, &session->buf[frame_size], options_len, opcode)) {
- return -1;
+ SCOPE_EXIT_RTN_VALUE(-1, "%s: ws_safe_read of options failed\n", WS_SESSION_REMOTE(session));
}
frame_size += options_len;
}
@@ -667,15 +853,15 @@ int AST_OPTIONAL_API_NAME(ast_websocket_read)(struct ast_websocket *session, cha
*payload = &session->buf[frame_size]; /* payload will start here, at the end of the options, if any */
frame_size = frame_size + (*payload_len); /* final frame size is header + optional headers + payload data */
if (frame_size > AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE) {
- ast_log(LOG_WARNING, "Cannot fit huge websocket frame of %zu bytes\n", frame_size);
/* The frame won't fit :-( */
ast_websocket_close(session, 1009);
- return -1;
+ SCOPE_EXIT_LOG_RTN_VALUE(-1, LOG_WARNING, "%s: Cannot fit huge websocket frame of %zu bytes\n",
+ WS_SESSION_REMOTE(session), frame_size);
}
if (*payload_len) {
if (ws_safe_read(session, *payload, *payload_len, opcode)) {
- return -1;
+ SCOPE_EXIT_RTN_VALUE(-1, "%s: ws_safe_read of payload failed\n", WS_SESSION_REMOTE(session));
}
}
@@ -693,33 +879,20 @@ int AST_OPTIONAL_API_NAME(ast_websocket_read)(struct ast_websocket *session, cha
ast_websocket_close(session, 1009);
}
*payload_len = 0;
- return 0;
+ SCOPE_EXIT_RTN_VALUE(0, "%s: PING received. Sent PONG\n", WS_SESSION_REMOTE(session));
}
- /* Stop PONG processing here */
- if (*opcode == AST_WEBSOCKET_OPCODE_PONG) {
- *payload_len = 0;
- return 0;
- }
-
- /* Save the CLOSE status code which will be sent in our own CLOSE in the destructor */
- if (*opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
- session->closing = 1;
- if (*payload_len >= 2) {
- session->close_status_code = ntohs(get_unaligned_uint16(*payload));
- }
- *payload_len = 0;
- return 0;
+ if (websocket_handled_pong_or_close(session, *payload, *payload_len, *opcode)) {
+ SCOPE_EXIT_RTN_VALUE(0, "%s: Handled PONG or CLOSE\n", WS_SESSION_REMOTE(session));
}
/* Below this point we are handling TEXT, BINARY or CONTINUATION opcodes */
if (*payload_len) {
if (!(new_payload = ast_realloc(session->payload, (session->payload_len + *payload_len)))) {
- ast_log(LOG_WARNING, "Failed allocation: %p, %zu, %"PRIu64"\n",
- session->payload, session->payload_len, *payload_len);
*payload_len = 0;
ast_websocket_close(session, 1009);
- return -1;
+ SCOPE_EXIT_LOG_RTN_VALUE(-1, LOG_WARNING, "%s: Failed allocation: %p, %zu, %"PRIu64"\n",
+ WS_SESSION_REMOTE(session), session->payload, session->payload_len, *payload_len);
}
session->payload = new_payload;
@@ -759,7 +932,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_read)(struct ast_websocket *session, cha
ast_websocket_close(session, 1003);
}
- return 0;
+ SCOPE_EXIT_RTN_VALUE(0, "%s: Read complete. Opcode: %s Length: %"PRIu64"\n",
+ WS_SESSION_REMOTE(session), websocket_opcode2str(*opcode), *payload_len);
}
/*!
@@ -1108,9 +1282,9 @@ int AST_OPTIONAL_API_NAME(ast_websocket_remove_protocol)(const char *name, ast_w
* path will contain everything after the address/port if included.
*/
static int websocket_client_parse_uri(const char *uri, char **host,
- struct ast_str **path, char **userinfo)
+ struct ast_str **path, char **userinfo, int proxy)
{
- struct ast_uri *parsed_uri = ast_uri_parse_websocket(uri);
+ struct ast_uri *parsed_uri = proxy ? ast_uri_parse_http(uri) : ast_uri_parse_websocket(uri);
if (!parsed_uri) {
return -1;
@@ -1118,7 +1292,6 @@ static int websocket_client_parse_uri(const char *uri, char **host,
*host = ast_uri_make_host_with_port(parsed_uri);
*userinfo = ast_strdup(ast_uri_user_info(parsed_uri));
-
if (ast_uri_path(parsed_uri) || ast_uri_query(parsed_uri)) {
*path = ast_str_create(64);
if (!*path) {
@@ -1155,13 +1328,13 @@ static void websocket_client_args_destroy(void *obj)
ast_free(args->tls_cfg);
}
-static struct ast_tcptls_session_args *websocket_client_args_create(
- const char *host, struct ast_tls_config *tls_cfg,
- enum ast_websocket_result *result)
+static struct ast_tcptls_session_args *websocket_client_args_create(struct ast_websocket *ws,
+ struct ast_websocket_client_options *options, enum ast_websocket_result *result)
{
struct ast_sockaddr *addr;
struct ast_tcptls_session_args *args = ao2_alloc(
sizeof(*args), websocket_client_args_destroy);
+ const char *resolve_host = NULL;
if (!args) {
*result = WS_ALLOCATE_ERROR;
@@ -1169,12 +1342,17 @@ static struct ast_tcptls_session_args *websocket_client_args_create(
}
args->accept_fd = -1;
- args->tls_cfg = tls_cfg;
+ args->tls_cfg = options->tls_cfg;
args->name = "websocket client";
- if (!ast_sockaddr_resolve(&addr, host, 0, 0)) {
+ if (!ast_strlen_zero(ws->client->options->proxy_host)) {
+ resolve_host = ws->client->options->proxy_host;
+ } else {
+ resolve_host = ws->client->host;
+ }
+ if (!ast_sockaddr_resolve(&addr, resolve_host, 0, 0)) {
ast_log(LOG_ERROR, "Unable to resolve address %s\n",
- host);
+ resolve_host);
ao2_ref(args, -1);
*result = WS_URI_RESOLVE_ERROR;
return NULL;
@@ -1185,7 +1363,7 @@ static struct ast_tcptls_session_args *websocket_client_args_create(
/* We need to save off the hostname but it may contain a port spec */
snprintf(args->hostname, sizeof(args->hostname),
"%.*s",
- (int) strcspn(host, ":"), host);
+ (int) strcspn(ws->client->host, ":"), ws->client->host);
return args;
}
@@ -1213,33 +1391,13 @@ static char *websocket_client_create_key(void)
return encoded;
}
-struct websocket_client {
- /*! host portion of client uri */
- char *host;
- /*! path for logical websocket connection */
- struct ast_str *resource_name;
- /*! unique key used during server handshaking */
- char *key;
- /*! container for registered protocols */
- char *protocols;
- /*! the protocol accepted by the server */
- char *accept_protocol;
- /*! websocket protocol version */
- int version;
- /*! tcptls connection arguments */
- struct ast_tcptls_session_args *args;
- /*! tcptls connection instance */
- struct ast_tcptls_session_instance *ser;
- /*! Authentication userid:password */
- char *userinfo;
- /*! Suppress connection log messages */
- int suppress_connection_msgs;
-};
-
static void websocket_client_destroy(void *obj)
{
struct websocket_client *client = obj;
+ char *id = ast_strdupa(client->options->uri);
+ SCOPE_ENTER(2, "%s: Client destructor %p\n", id, obj);
+ ao2_cleanup(client->options);
ao2_cleanup(client->ser);
ao2_cleanup(client->args);
@@ -1249,13 +1407,76 @@ static void websocket_client_destroy(void *obj)
ast_free(client->resource_name);
ast_free(client->host);
ast_free(client->userinfo);
+ ast_free(client->proxy_userinfo);
+
+ SCOPE_EXIT_RTN("%s: Client destructor complete\n", id);
+}
+
+static void client_options_destroy(void *obj)
+{
+ struct ast_websocket_client_options *clone = obj;
+ ast_free((char *)clone->uri);
+ ast_free((char *)clone->protocols);
+ ast_free((char *)clone->username);
+ ast_free((char *)clone->password);
+ ast_free((char *)clone->proxy_host);
+ ast_free((char *)clone->proxy_username);
+ ast_free((char *)clone->proxy_password);
+ ast_free(clone->tls_cfg);
+}
+
+#define SAFE_STRDUP_WITH_ERROR_RTN(_clone, _str) \
+({ \
+ char *_duped = NULL; \
+ if (_str) { \
+ _duped = ast_strdup(_str); \
+ if (!_duped) { \
+ ao2_cleanup(_clone); \
+ return NULL; \
+ } \
+ } \
+ _duped; \
+})
+
+static struct ast_websocket_client_options *client_options_clone(
+ struct ast_websocket_client_options *options)
+{
+ struct ast_websocket_client_options *clone = NULL;
+
+ clone = ao2_alloc(sizeof(*clone), client_options_destroy);
+ if (!clone) {
+ ast_log(LOG_ERROR, "Unable to clone client options\n");
+ return NULL;
+ }
+
+ memcpy(clone, options, sizeof(*options));
+ clone->uri = SAFE_STRDUP_WITH_ERROR_RTN(clone, options->uri);
+ clone->protocols = SAFE_STRDUP_WITH_ERROR_RTN(clone, options->protocols);
+ clone->username = SAFE_STRDUP_WITH_ERROR_RTN(clone, options->username);
+ clone->password = SAFE_STRDUP_WITH_ERROR_RTN(clone, options->password);
+ clone->proxy_host = SAFE_STRDUP_WITH_ERROR_RTN(clone, options->proxy_host);
+ clone->proxy_username = SAFE_STRDUP_WITH_ERROR_RTN(clone, options->proxy_username);
+ clone->proxy_password = SAFE_STRDUP_WITH_ERROR_RTN(clone, options->proxy_password);
+ if (options->tls_cfg) {
+ clone->tls_cfg = ast_calloc(1, sizeof(*options->tls_cfg));
+ if (!clone->tls_cfg) {
+ ao2_cleanup(clone);
+ return NULL;
+ }
+ memcpy(clone->tls_cfg, options->tls_cfg, sizeof(*options->tls_cfg));
+ }
+
+ return clone;
}
static struct ast_websocket * websocket_client_create(
struct ast_websocket_client_options *options, enum ast_websocket_result *result)
{
- struct ast_websocket *ws = ao2_alloc(sizeof(*ws), session_destroy_fn);
+ struct ast_websocket *ws = NULL;
+ ast_debug(2, "%s: Creating client\n", options->uri);
+
+ ws = ao2_alloc(sizeof(*ws), session_destroy_fn);
if (!ws) {
ast_log(LOG_ERROR, "Unable to allocate websocket\n");
*result = WS_ALLOCATE_ERROR;
@@ -1270,12 +1491,21 @@ static struct ast_websocket * websocket_client_create(
}
if (!(ws->client = ao2_alloc(
- sizeof(*ws->client), websocket_client_destroy))) {
+ sizeof(*ws->client), websocket_client_destroy))) {
ast_log(LOG_ERROR, "Unable to allocate websocket client\n");
ao2_ref(ws, -1);
*result = WS_ALLOCATE_ERROR;
return NULL;
}
+ ws->client->ping_sched_timer = -1;
+
+ ws->client->options = client_options_clone(options);
+ if (!ws->client->options) {
+ ast_log(LOG_ERROR, "Unable to clone client options\n");
+ ao2_ref(ws, -1);
+ *result = WS_ALLOCATE_ERROR;
+ return NULL;
+ }
if (!(ws->client->key = websocket_client_create_key())) {
ao2_ref(ws, -1);
@@ -1285,11 +1515,14 @@ static struct ast_websocket * websocket_client_create(
if (websocket_client_parse_uri(
options->uri, &ws->client->host, &ws->client->resource_name,
- &ws->client->userinfo)) {
+ &ws->client->userinfo, 0)) {
ao2_ref(ws, -1);
*result = WS_URI_PARSE_ERROR;
return NULL;
}
+ ast_debug(2, "%s: host: %s resource: %s userinfo: %s\n", options->uri, ws->client->host,
+ ws->client->resource_name ? ast_str_buffer(ws->client->resource_name) : "",
+ ws->client->userinfo);
if (ast_strlen_zero(ws->client->userinfo)
&& !ast_strlen_zero(options->username)
@@ -1298,8 +1531,20 @@ static struct ast_websocket * websocket_client_create(
options->password);
}
- if (!(ws->client->args = websocket_client_args_create(
- ws->client->host, options->tls_cfg, result))) {
+ if (!ast_strlen_zero(options->proxy_host)) {
+ ast_debug(2, "%s: Proxy host: %s userinfo: %s\n", options->uri, ws->client->options->proxy_host,
+ ws->client->proxy_userinfo);
+
+ if (ast_strlen_zero(ws->client->proxy_userinfo)
+ && !ast_strlen_zero(options->proxy_username)
+ && !ast_strlen_zero(options->proxy_password)) {
+ ast_asprintf(&ws->client->proxy_userinfo, "%s:%s", options->proxy_username,
+ options->proxy_password);
+ }
+
+ }
+
+ if (!(ws->client->args = websocket_client_args_create(ws, options, result))) {
ao2_ref(ws, -1);
return NULL;
}
@@ -1322,7 +1567,7 @@ const char * AST_OPTIONAL_API_NAME(
}
static enum ast_websocket_result websocket_client_handle_response_code(
- struct websocket_client *client, int response_code)
+ struct websocket_client *client, int response_code, int proxy)
{
if (response_code <= 0) {
return WS_INVALID_RESPONSE;
@@ -1330,7 +1575,15 @@ static enum ast_websocket_result websocket_client_handle_response_code(
switch (response_code) {
case 101:
- return 0;
+ if (!proxy) {
+ return WS_OK;
+ }
+ break;
+ case 200:
+ if (proxy) {
+ return WS_OK;
+ }
+ break;
case 400:
if (!client->suppress_connection_msgs) {
ast_log(LOG_ERROR, "Received response 400 - Bad Request "
@@ -1353,13 +1606,13 @@ static enum ast_websocket_result websocket_client_handle_response_code(
if (!client->suppress_connection_msgs) {
ast_log(LOG_ERROR, "Invalid HTTP response code %d from %s\n",
- response_code, client->host);
+ response_code, proxy ? client->options->proxy_host : client->host);
}
return WS_INVALID_RESPONSE;
}
static enum ast_websocket_result websocket_client_handshake_get_response(
- struct websocket_client *client)
+ struct websocket_client *client, int proxy)
{
enum ast_websocket_result res;
char buf[4096];
@@ -1368,20 +1621,25 @@ static enum ast_websocket_result websocket_client_handshake_get_response(
int has_connection = 0;
int has_accept = 0;
int has_protocol = 0;
+ int status_code = 0;
+ SCOPE_ENTER(2, "%s: Proxy? %s Proxy host: %s\n", client->options->uri, AST_YESNO(proxy),
+ S_OR(client->options->proxy_host, "N/A"));
while (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) <= 0) {
if (errno == EINTR || errno == EAGAIN) {
continue;
}
-
- ast_log(LOG_ERROR, "Unable to retrieve HTTP status line.");
- return WS_BAD_STATUS;
+ SCOPE_EXIT_LOG_RTN_VALUE(WS_BAD_STATUS, LOG_ERROR, "%s: Unable to retrieve HTTP status line",
+ client->options->uri);
}
- if ((res = websocket_client_handle_response_code(client,
- ast_http_response_status_line(
- buf, "HTTP/1.1", 101))) != WS_OK) {
- return res;
+ status_code = ast_http_response_status_line(buf, "HTTP/1.1", 101);
+ ast_trace(-1, "%s: Status code: %d\n", client->options->uri, status_code);
+
+ res = websocket_client_handle_response_code(client, status_code, proxy);
+ if (res != WS_OK) {
+ SCOPE_EXIT_RTN_VALUE(res, "%s: HTTP status line result: %d/%s", client->options->uri,
+ res, ast_websocket_result_to_str(res));
}
/* Ignoring line folding - assuming header field values are contained
@@ -1403,40 +1661,46 @@ static enum ast_websocket_result websocket_client_handshake_get_response(
break;
}
- if (parsed > 0) {
+ if (proxy || parsed > 0) {
continue;
}
if (!has_upgrade &&
(has_upgrade = ast_http_header_match(
name, "upgrade", value, "websocket")) < 0) {
- return WS_HEADER_MISMATCH;
+ SCOPE_EXIT_RTN_VALUE(WS_HEADER_MISMATCH);
} else if (!has_connection &&
(has_connection = ast_http_header_match(
name, "connection", value, "upgrade")) < 0) {
- return WS_HEADER_MISMATCH;
+ SCOPE_EXIT_RTN_VALUE(WS_HEADER_MISMATCH);
} else if (!has_accept &&
(has_accept = ast_http_header_match(
name, "sec-websocket-accept", value,
websocket_combine_key(
client->key, base64, sizeof(base64)))) < 0) {
- return WS_HEADER_MISMATCH;
+ SCOPE_EXIT_RTN_VALUE(WS_HEADER_MISMATCH);
} else if (!has_protocol &&
(has_protocol = ast_http_header_match_in(
name, "sec-websocket-protocol", value, client->protocols))) {
if (has_protocol < 0) {
- return WS_HEADER_MISMATCH;
+ SCOPE_EXIT_RTN_VALUE(WS_HEADER_MISMATCH);
}
client->accept_protocol = ast_strdup(value);
} else if (!strcasecmp(name, "sec-websocket-extensions")) {
ast_log(LOG_ERROR, "Extensions received, but not "
"supported by client\n");
- return WS_NOT_SUPPORTED;
+ SCOPE_EXIT_RTN_VALUE(WS_NOT_SUPPORTED);
}
}
- return has_upgrade && has_connection && has_accept ?
- WS_OK : WS_HEADER_MISSING;
+ if (proxy) {
+ res = WS_OK;
+ } else {
+ res = has_upgrade && has_connection && has_accept ?
+ WS_OK : WS_HEADER_MISSING;
+ }
+
+ SCOPE_EXIT_RTN_VALUE(res);
}
#define optional_header_spec "%s%s%s"
@@ -1445,6 +1709,42 @@ static enum ast_websocket_result websocket_client_handshake_get_response(
test ? value : "", \
test ? "\r\n" : ""
+static enum ast_websocket_result websocket_proxy_handshake(
+ struct websocket_client *client)
+{
+ struct ast_variable *auth_header = NULL;
+ enum ast_websocket_result res = WS_OK;
+ size_t bytes_written = 0;
+ SCOPE_ENTER(2, "%s: Handshaking with proxy %s\n", client->options->uri, client->options->proxy_host);
+
+ if (!ast_strlen_zero(client->proxy_userinfo)) {
+ auth_header = ast_http_create_basic_auth_header(client->proxy_userinfo, NULL);
+ if (!auth_header) {
+ SCOPE_EXIT_LOG_RTN_VALUE(WS_ALLOCATE_ERROR, LOG_ERROR, "Unable to allocate client websocket userinfo\n");
+ }
+ }
+
+ bytes_written = ast_iostream_printf(client->ser->stream,
+ "CONNECT %s HTTP/1.1\r\n"
+ "Host: %s\r\n"
+ optional_header_spec
+ "Proxy-Connection: Keep-Alive\r\n"
+ "\r\n",
+ client->host,
+ client->host,
+ print_optional_header(auth_header, "Proxy-Authorization: ", auth_header->value)
+ );
+
+ ast_variables_destroy(auth_header);
+ if (bytes_written < 0) {
+ SCOPE_EXIT_LOG_RTN_VALUE(WS_WRITE_ERROR, LOG_ERROR, "Failed to send handshake\n");
+ }
+ /* wait for a response before doing anything else */
+ res = websocket_client_handshake_get_response(client, 1);
+
+ SCOPE_EXIT_RTN_VALUE(res, "%s\n", ast_websocket_result_to_str(res));
+}
+
static enum ast_websocket_result websocket_client_handshake(
struct websocket_client *client)
{
@@ -1486,30 +1786,83 @@ static enum ast_websocket_result websocket_client_handshake(
return WS_WRITE_ERROR;
}
/* wait for a response before doing anything else */
- return websocket_client_handshake_get_response(client);
+ return websocket_client_handshake_get_response(client, 0);
}
-static enum ast_websocket_result websocket_client_connect(struct ast_websocket *ws, int timeout)
+static enum ast_websocket_result websocket_client_connect(struct ast_websocket *ws,
+ struct ast_websocket_client_options *options)
{
enum ast_websocket_result res;
+ int original_tls_enabled = ws->client->args->tls_cfg ? ws->client->args->tls_cfg->enabled : 0;
+ int proxy = !ast_strlen_zero(ws->client->options->proxy_host);
+ SCOPE_ENTER(2, "%s: proxy: %s tls_enabled: %s\n", options->uri, S_OR(ws->client->options->proxy_host, "N/A"),
+ AST_YESNO(original_tls_enabled));
+
+
/* create and connect the client - note client_start
releases the session instance on failure */
- if (!(ws->client->ser = ast_tcptls_client_start_timeout(
- ast_tcptls_client_create(ws->client->args), timeout))) {
- return WS_CLIENT_START_ERROR;
+
+ if (proxy && original_tls_enabled ) {
+ ast_trace(-1, "%s: Disabling TLS while handshaking with proxy\n", options->uri);
+ if (ws->client->args->tls_cfg) {
+ ws->client->args->tls_cfg->enabled = 0;
+ }
+ }
+
+ ast_trace(-1, "%s: Creating tcptls client\n", options->uri);
+ ws->client->ser = ast_tcptls_client_create(ws->client->args);
+ if (!ws->client->ser) {
+ SCOPE_EXIT_LOG_RTN_VALUE(WS_CLIENT_START_ERROR, LOG_ERROR, "%s: Unable to create tcptls client\n", options->uri);
+ }
+
+ ast_trace(-1, "%s: Connecting%s%s\n", options->uri, proxy ? " via proxy " : "", S_OR(ws->client->options->proxy_host, ""));
+ if (!ast_tcptls_client_start_timeout(ws->client->ser, options->timeout)) {
+ ws->client->ser = NULL;
+ SCOPE_EXIT_LOG_RTN_VALUE(WS_CLIENT_START_ERROR, LOG_ERROR, "%s: Unable to connect%s%s\n", options->uri,
+ proxy ? " via proxy " : "", S_OR(ws->client->options->proxy_host, ""));
+ }
+
+ ast_trace(-1, "%s: Connected%s%s\n", options->uri, proxy ? " via proxy " : "", S_OR(ws->client->options->proxy_host, ""));
+ if (proxy) {
+ res = websocket_proxy_handshake(ws->client);
+ if (res != WS_OK) {
+ ao2_ref(ws->client->ser, -1);
+ ws->client->ser = NULL;
+ SCOPE_EXIT_LOG_RTN_VALUE(res, LOG_ERROR, "%s: Unable to perform proxy handshake with %s\n", options->uri,
+ ws->client->options->proxy_host);
+ }
+ }
+
+ if (proxy && original_tls_enabled && !ws->client->args->tls_cfg->enabled) {
+ int rc = 0;
+ ast_trace(-1, "%s: Re-enabling TLS after handshaking with proxy %s\n", options->uri, ws->client->options->proxy_host);
+ ws->client->args->tls_cfg->enabled = 1;
+ rc = ast_ssl_setup_client(ws->client->args->tls_cfg);
+ if (rc != 1) {
+ ao2_cleanup(ws->client->ser);
+ ws->client->ser = NULL;
+ SCOPE_EXIT_LOG_RTN_VALUE(WS_TLS_ERROR, LOG_ERROR, "%s: TLS context setup failed after handshake with %s\n",
+ options->uri, ws->client->options->proxy_host);
+ }
+ if (!ast_tcptls_start_tls(ws->client->ser)) {
+ ws->client->ser = NULL;
+ SCOPE_EXIT_LOG_RTN_VALUE(WS_TLS_ERROR, LOG_ERROR, "%s: TLS with websocket server failed after handshake with %s\n",
+ options->uri, ws->client->options->proxy_host);
+ }
}
if ((res = websocket_client_handshake(ws->client)) != WS_OK) {
ao2_ref(ws->client->ser, -1);
ws->client->ser = NULL;
- return res;
+ SCOPE_EXIT_LOG_RTN_VALUE(res, LOG_ERROR, "%s: Unable to perform websocket handshake\n", options->uri);
}
ws->stream = ws->client->ser->stream;
ws->secure = ast_iostream_get_ssl(ws->stream) ? 1 : 0;
ws->client->ser->stream = NULL;
ast_sockaddr_copy(&ws->remote_address, &ws->client->ser->remote_address);
- return WS_OK;
+
+ SCOPE_EXIT_RTN_VALUE(WS_OK, "%s: Handshake complete\n", options->uri);
}
struct ast_websocket *AST_OPTIONAL_API_NAME(ast_websocket_client_create)
@@ -1526,21 +1879,125 @@ struct ast_websocket *AST_OPTIONAL_API_NAME(ast_websocket_client_create)
return ast_websocket_client_create_with_options(&options, result);
}
+static int ping_scheduler_callback(const void *obj)
+{
+ struct ast_websocket *session = (struct ast_websocket *)obj;
+
+ if (!session->client) {
+ /*
+ * We should never get here because we can only start pingpongs from a client
+ * but just in case...
+ */
+ return 0;
+ }
+
+ ao2_lock(session);
+ if (session->closing) {
+ ao2_unlock(session);
+ return 0;
+ }
+
+ if (session->client->missed_pong_count > 1) {
+ ast_debug(2, "%s: Missed PONG count is now %d\n", WS_SESSION_REMOTE(session),
+ session->client->missed_pong_count);
+ }
+
+ if (session->client->missed_pong_count >= session->client->options->pingpong_probes) {
+ ao2_unlock(session);
+ ast_log(LOG_WARNING, "%s: %d missed PONGs. Closing connection.\n",
+ WS_SESSION_REMOTE(session), session->client->missed_pong_count);
+ session->client->ping_sched_timer = -1;
+ websocket_close(session, AST_WEBSOCKET_STATUS_GOING_AWAY, 1);
+ return 0;
+ }
+
+ ast_websocket_write(session, AST_WEBSOCKET_OPCODE_PING, WS_PING_PAYLOAD, WS_PING_PAYLOAD_LEN);
+ session->client->missed_pong_count++;
+
+ ao2_unlock(session);
+ return session->client->options->pingpong_interval * 1000;
+}
+
+/*
+ * Notes on client session lifecycle:
+ *
+ * Historically, the lifecycle of a client session was fairly simple. Ownership was
+ * transferred to the caller with the return of the create and when the caller released
+ * their last reference, the session was closed by the destructor. There was one issue
+ * with this however, if the remote end sent us a CLOSE opcode, we were destroying
+ * everything without sending the required CLOSE reply. This could cause issues on
+ * the remote end. The addition of WebSocket PING/PONG capability also doesn't work well
+ * with that pattern because it adds a scheduler and callback which means that a reference
+ * must be held by this module as long as the scheduler is active. This means that a caller
+ * can't just unref the websocket and expect it to be automatically closed.
+ *
+ * So now...
+ *
+ * Once the websocket is created and connected, the usual pattern is for the higher
+ * level client to be in a loop that blocks waiting on a websocket frame to be
+ * available. The client owns a reference to the websocket while the loop is
+ * active and if we're sending PINGs, the scheduled task also holds a reference.
+ *
+ * When the higher level client wants to close the websocket, it calls
+ * ast_websocket_close() from another thread. We mark the session as CLOSED_BY_US,
+ * stop the scheduled PING task and release its reference, send a CLOSE opcode to the
+ * remote, then return to the caller. The client's read thread is still blocked
+ * at this point. When we receive the CLOSE reply from the remote, we use the
+ * CLOSED_BY_US flag to indicate that we're done and can close the socket and stream.
+ * This causes the client's read thread to unblock with a CLOSE opcode which then calls
+ * ast_websocket_close() (which is a NoOp because we already did the close) and
+ * ast_websocket_unref() which triggers the session destructor.
+ *
+ * When the remote sends us a CLOSE opcode, we mark the session as CLOSED_BY_REMOTE
+ * and return the CLOSE opcode in the ast_websocket_read that the caller should have
+ * been blocked on. The caller must then call ast_websocket_close() just as above.
+ * In this case however, it's not a NoOp. We stop the scheduled PING task and
+ * release its reference, send a CLOSE reply to the remote and since the transaction
+ * is done, we close the socket and stream and return to the caller. The caller then
+ * calls ast_websocket_unref() which triggers the session destructor.
+ */
+
struct ast_websocket *AST_OPTIONAL_API_NAME(ast_websocket_client_create_with_options)
(struct ast_websocket_client_options *options, enum ast_websocket_result *result)
{
- struct ast_websocket *ws = websocket_client_create(options, result);
+ struct ast_websocket *ws = NULL;
+ SCOPE_ENTER(2, "%s: Creating client\n", options->uri);
+ ws = websocket_client_create(options, result);
if (!ws) {
- return NULL;
+ SCOPE_EXIT_LOG_RTN_VALUE(NULL, LOG_ERROR, "%s: Failed to create: %s\n", options->uri, ast_websocket_result_to_str(*result));
}
- if ((*result = websocket_client_connect(ws, options->timeout)) != WS_OK) {
+ ast_trace(-1, "%s: Connecting\n", ws->client->options->uri);
+
+ if ((*result = websocket_client_connect(ws, options)) != WS_OK) {
ao2_ref(ws, -1);
- return NULL;
+ SCOPE_EXIT_RTN_VALUE(NULL, "%s: Failed to connect: %s\n", options->uri, ast_websocket_result_to_str(*result));
}
+ ast_trace(-1, "%s: Connected\n", ws->client->options->uri);
- return ws;
+ if (ws->client->options->tcp_keepalives) {
+ int sockfd = ast_iostream_get_fd(ws->stream);
+ int enabled = 1;
+
+ setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled));
+ setsockopt(sockfd, IPPROTO_TCP, TCP_KEEPIDLE, &options->tcp_keepalive_time, sizeof(options->tcp_keepalive_time));
+ setsockopt(sockfd, IPPROTO_TCP, TCP_KEEPINTVL, &options->tcp_keepalive_interval, sizeof(&options->tcp_keepalive_interval));
+ setsockopt(sockfd, IPPROTO_TCP, TCP_KEEPCNT, &options->tcp_keepalive_probes, sizeof(&options->tcp_keepalive_probes));
+ ast_trace(-1, "%s: Enabled TCP keepalives\n", ws->client->options->uri);
+ }
+
+ if (ws->client->options->pingpongs) {
+ ws->client->ping_sched_timer = ast_sched_add(ping_scheduler, ws->client->options->pingpong_interval * 1000,
+ ping_scheduler_callback, ao2_bump(ws));
+ if (ws->client->ping_sched_timer < 0) {
+ ast_log(LOG_WARNING, "%s: Unable to schedule PING/PONG keepalives\n", ws->client->options->uri);
+ ao2_ref(ws, -1);
+ } else {
+ ast_trace(-1, "%s: Enabled PING/PONG keepalives\n", ws->client->options->uri);
+ }
+ }
+ SCOPE_EXIT_RTN_VALUE(ws, "%s: Client created and connected %p %p\n", options->uri, ws, ws->client);
}
int AST_OPTIONAL_API_NAME(ast_websocket_read_string)
@@ -1669,6 +2126,21 @@ const char *AST_OPTIONAL_API_NAME(ast_websocket_status_to_str)
return "Unknown";
}
+static int unload_module(void)
+{
+ if (ping_scheduler) {
+ ast_sched_context_destroy(ping_scheduler);
+ ping_scheduler = NULL;
+ }
+
+ websocket_remove_protocol_internal("echo", websocket_echo_callback);
+ ast_http_uri_unlink(&websocketuri);
+ ao2_ref(websocketuri.data, -1);
+ websocketuri.data = NULL;
+
+ return 0;
+}
+
static int load_module(void)
{
websocketuri.data = websocket_server_internal_create();
@@ -1678,15 +2150,16 @@ static int load_module(void)
ast_http_uri_link(&websocketuri);
websocket_add_protocol_internal("echo", websocket_echo_callback);
- return 0;
-}
+ ping_scheduler = ast_sched_context_create();
+ if (!ping_scheduler) {
+ unload_module();
+ return AST_MODULE_LOAD_DECLINE;
+ }
-static int unload_module(void)
-{
- websocket_remove_protocol_internal("echo", websocket_echo_callback);
- ast_http_uri_unlink(&websocketuri);
- ao2_ref(websocketuri.data, -1);
- websocketuri.data = NULL;
+ if (ast_sched_start_thread(ping_scheduler)) {
+ unload_module();
+ return AST_MODULE_LOAD_DECLINE;
+ }
return 0;
}
diff --git a/res/res_websocket_client.c b/res/res_websocket_client.c
index 290021b6c1..fbc00fc2f9 100644
--- a/res/res_websocket_client.c
+++ b/res/res_websocket_client.c
@@ -221,6 +221,128 @@ verify_server_hostname = no
</since>
<synopsis>If set to true, verify that the server's hostname matches the common name in it's certificate. (optional)</synopsis>
</configOption>
+ <configOption name="proxy_host">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Proxy host including port for outbound proxy if required. (optional)</synopsis>
+ <description>
+ <para>
+ If an outbound proxy is required to reach the websocket server,
+ specify a host in the form <literal><host>:<port7gt;</literal>.
+ Currently only http (non-TLS) proxies are supported although the tunnelled
+ connection to the websocket server can have TLS enabled.
+ </para>
+ </description>
+ </configOption>
+ <configOption name="proxy_username">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Proxy authentication username if required. (optional)</synopsis>
+ </configOption>
+ <configOption name="proxy_password">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Proxy authentication password if required. (optional)</synopsis>
+ </configOption>
+ <configOption name="enable_tcp_keepalives">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Enable TCP Keepalives. (optional)</synopsis>
+ </configOption>
+ <configOption name="tcp_keepalive_time">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Start sending keepalives when no data has been sent for this many seconds. (optional)</synopsis>
+ </configOption>
+ <configOption name="tcp_keepalive_interval">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Send keepalives at this interval in seconds. (optional)</synopsis>
+ <description>
+ <para>
+ If a reply isn't received by the time the next keepalive is due
+ to be sent, it's considered missed so this option also controls
+ how long it takes to detect a failure.
+ </para>
+ </description>
+ </configOption>
+ <configOption name="tcp_keepalive_probes">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Close the connection after this many missed replies. (optional)</synopsis>
+ <description>
+ <para>
+ If a reply isn't received by the time the next keepalive is due
+ to be sent, it's considered missed. The time to detect a failure
+ is therefore between (probes * interval) and
+ ((probes + 1) * interval) seconds. If the connection closes
+ and reconnect_interval reconnect_attempts are set, a new connection
+ will be attempted using those parameters.
+ </para>
+ </description>
+ </configOption>
+ <configOption name="enable_pingpongs">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Enable WebSocket PING/PONGs.. (optional)</synopsis>
+ </configOption>
+ <configOption name="pingpong_interval">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Send WebSocket PINGs at this interval in seconds. (optional)</synopsis>
+ <description>
+ <para>
+ If a reply isn't received by the time the next PING is due
+ to be sent, it's considered missed so this option also controls
+ how long it takes to detect a failure.
+ </para>
+ </description>
+ </configOption>
+ <configOption name="pingpong_probes">
+ <since>
+ <version>20.21.0</version>
+ <version>22.11.0</version>
+ <version>23.5.0</version>
+ </since>
+ <synopsis>Close the connection after this many missed PONG replies. (optional)</synopsis>
+ <description>
+ <para>
+ If a reply isn't received by the time the next PING is due
+ to be sent, it's considered missepingd. The time to detect a failure
+ is therefore between (probes * interval) and
+ ((probes + 1) * interval) seconds. If the connection closes
+ and reconnect_interval reconnect_attempts are set, a new connection
+ will be attempted using those parameters.
+ </para>
+ </description>
+ </configOption>
</configObject>
</configFile>
</configInfo>
@@ -276,6 +398,16 @@ struct ast_websocket *ast_websocket_client_connect(struct ast_websocket_client *
.password = wc->password,
.timeout = wc->connect_timeout,
.suppress_connection_msgs = 1,
+ .proxy_host = wc->proxy_host,
+ .proxy_username = wc->proxy_username,
+ .proxy_password = wc->proxy_password,
+ .tcp_keepalives = wc->tcp_keepalives,
+ .tcp_keepalive_time = wc->tcp_keepalive_time,
+ .tcp_keepalive_interval = wc->tcp_keepalive_interval,
+ .tcp_keepalive_probes = wc->tcp_keepalive_probes,
+ .pingpongs = wc->pingpongs,
+ .pingpong_interval = wc->pingpong_interval,
+ .pingpong_probes = wc->pingpong_probes,
.tls_cfg = NULL,
};
@@ -383,6 +515,21 @@ static void *wc_alloc(const char *id)
return NULL;
}
+ if (ast_string_field_init_extended(wc, proxy_host) != 0) {
+ ao2_cleanup(wc);
+ return NULL;
+ }
+
+ if (ast_string_field_init_extended(wc, proxy_username) != 0) {
+ ao2_cleanup(wc);
+ return NULL;
+ }
+
+ if (ast_string_field_init_extended(wc, proxy_password) != 0) {
+ ao2_cleanup(wc);
+ return NULL;
+ }
+
ast_debug(2, "%s: Allocated websocket client config\n", id);
return wc;
}
@@ -438,6 +585,29 @@ static int wc_apply(const struct ast_sorcery *sorcery, void *obj)
res = -1;
}
+ if (!ast_strlen_zero(wc->proxy_host)) {
+ char *host = NULL;
+ char *port = NULL;
+ char *s = ast_strdupa(wc->proxy_host);
+ if (!ast_sockaddr_split_hostport(s, &host, &port, PARSE_PORT_REQUIRE)) {
+ ast_log(LOG_WARNING, "%s: proxy_host '%s' is missing a port\n", id, wc->proxy_host);
+ res = -1;
+ }
+ }
+
+ if (wc->tcp_keepalives) {
+ if (!wc->tcp_keepalive_time || !wc->tcp_keepalive_interval || !wc->tcp_keepalive_probes) {
+ ast_log(LOG_WARNING, "%s: tcp_keepalive_time, tcp_keepalive_interval and tcp_keepalive_probes must all be non-zero\n", id);
+ res = -1;
+ }
+ }
+
+ if (wc->pingpongs) {
+ if (!wc->pingpong_interval || !wc->pingpong_probes) {
+ ast_log(LOG_WARNING, "%s: pingpong_interval and pingpong_probes must be non-zero\n", id);
+ res = -1;
+ }
+ }
if (res != 0) {
ast_log(LOG_WARNING, "%s: Websocket client configuration failed\n", id);
} else {
@@ -526,6 +696,26 @@ enum ast_ws_client_fields ast_websocket_client_get_field_diff(
changed |= AST_WS_CLIENT_FIELD_VERIFY_SERVER_CERT;
} else if (ast_strings_equal(v->name, "verify_server_hostname")) {
changed |= AST_WS_CLIENT_FIELD_VERIFY_SERVER_HOSTNAME;
+ } else if (ast_strings_equal(v->name, "proxy_host")) {
+ changed |= AST_WS_CLIENT_FIELD_PROXY_HOST;
+ } else if (ast_strings_equal(v->name, "proxy_username")) {
+ changed |= AST_WS_CLIENT_FIELD_PROXY_USERNAME;
+ } else if (ast_strings_equal(v->name, "proxy_password")) {
+ changed |= AST_WS_CLIENT_FIELD_PROXY_PASSWORD;
+ } else if (ast_strings_equal(v->name, "enable_tcp_keepalives")) {
+ changed |= AST_WS_CLIENT_FIELD_TCP_KEEPALIVES;
+ } else if (ast_strings_equal(v->name, "tcp_keepalive_time")) {
+ changed |= AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_TIME;
+ } else if (ast_strings_equal(v->name, "tcp_keepalive_interval")) {
+ changed |= AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_INTERVAL;
+ } else if (ast_strings_equal(v->name, "tcp_keepalive_probes")) {
+ changed |= AST_WS_CLIENT_FIELD_TCP_KEEPALIVE_PROBES;
+ } else if (ast_strings_equal(v->name, "enable_pingpongs")) {
+ changed |= AST_WS_CLIENT_FIELD_PINGPONGS;
+ } else if (ast_strings_equal(v->name, "pingpong_interval")) {
+ changed |= AST_WS_CLIENT_FIELD_PINGPONG_INTERVAL;
+ } else if (ast_strings_equal(v->name, "pingpong_probes")) {
+ changed |= AST_WS_CLIENT_FIELD_PINGPONG_PROBES;
} else {
ast_debug(2, "%s: Unknown change %s\n", new_id, v->name);
}
@@ -599,6 +789,16 @@ static int load_module(void)
ast_sorcery_register_int(websocket_client, ast_websocket_client, connection_timeout, connect_timeout, 500);
ast_sorcery_register_int(websocket_client, ast_websocket_client, reconnect_attempts, reconnect_attempts, 4);
ast_sorcery_register_int(websocket_client, ast_websocket_client, reconnect_interval, reconnect_interval, 500);
+ ast_sorcery_register_sf(websocket_client, ast_websocket_client, proxy_host, proxy_host, "");
+ ast_sorcery_register_sf(websocket_client, ast_websocket_client, proxy_username, proxy_username, "");
+ ast_sorcery_register_sf(websocket_client, ast_websocket_client, proxy_password, proxy_password, "");
+ ast_sorcery_register_bool(websocket_client, ast_websocket_client, enable_tcp_keepalives, tcp_keepalives, "no");
+ ast_sorcery_register_uint(websocket_client, ast_websocket_client, tcp_keepalive_time, tcp_keepalive_time, 20);
+ ast_sorcery_register_uint(websocket_client, ast_websocket_client, tcp_keepalive_interval, tcp_keepalive_interval, 20);
+ ast_sorcery_register_uint(websocket_client, ast_websocket_client, tcp_keepalive_probes, tcp_keepalive_probes, 3);
+ ast_sorcery_register_bool(websocket_client, ast_websocket_client, enable_pingpongs, pingpongs, "no");
+ ast_sorcery_register_uint(websocket_client, ast_websocket_client, pingpong_interval, pingpong_interval, 20);
+ ast_sorcery_register_uint(websocket_client, ast_websocket_client, pingpong_probes, pingpong_probes, 3);
ast_sorcery_load(sorcery);