Commit f29ce05e3e for asterisk.org
commit f29ce05e3e5262a2040e86e89f98638453b032e0
Author: Mike Bradeen <mbradeen@sangoma.com>
Date: Tue Oct 28 15:26:03 2025 -0600
taskprocessors: Improve logging and add new cli options
This change makes some small changes to improve log readability in
addition to the following changes:
Modified 'core show taskprocessors' to now show Low time and High time
for task execution.
New command 'core show taskprocessor name <taskprocessor-name>' to dump
taskprocessor info and current queue.
Addionally, a new test was added to demonstrate the 'show taskprocessor
name' functionality:
test execute category /main/taskprocessor/ name taskprocessor_cli_show
Setting 'core set debug 3 taskprocessor.c' will now log pushed tasks.
(Warning this is will cause extremely high levels of logging at even
low traffic levels.)
Resolves: #1566
UserNote: New CLI command has been added -
core show taskprocessor name <taskprocessor-name>
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 2fafd5790a..55461910a9 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -2076,7 +2076,11 @@ struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg);
* \retval 0 Success
* \retval -1 Failure
*/
-int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+int __ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function);
+
+#define ast_sip_push_task(serializer, sip_task, task_data) \
+ __ast_sip_push_task(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Push a task to SIP servants and wait for it to complete.
@@ -2112,13 +2116,19 @@ int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void
* \return sip_task() return value on success.
* \retval -1 Failure to push the task.
*/
-int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+int __ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function);
+#define ast_sip_push_task_wait_servant(serializer, sip_task, task_data) \
+ __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Push a task to SIP servants and wait for it to complete.
* \deprecated Replaced with ast_sip_push_task_wait_servant().
*/
-int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+int __ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function);
+#define ast_sip_push_task_synchronous(serializer, sip_task, task_data) \
+ __ast_sip_push_task_synchronous(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Push a task to the serializer and wait for it to complete.
@@ -2162,7 +2172,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
* \return sip_task() return value on success.
* \retval -1 Failure to push the task.
*/
-int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+int __ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function);
+#define ast_sip_push_task_wait_serializer(serializer, sip_task, task_data) \
+ __ast_sip_push_task_wait_serializer(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Determine if the current thread is a SIP servant thread
diff --git a/include/asterisk/taskpool.h b/include/asterisk/taskpool.h
index 2a4f963052..a2adc3e93b 100644
--- a/include/asterisk/taskpool.h
+++ b/include/asterisk/taskpool.h
@@ -197,8 +197,11 @@ long ast_taskpool_queue_size(struct ast_taskpool *pool);
* \retval 0 success
* \retval -1 failure
*/
-int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+int __ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data,
+ const char *file, int line, const char *function)
attribute_warn_unused_result;
+#define ast_taskpool_push(pool, task, data) \
+ __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Push a task to the taskpool, and wait for completion
@@ -214,8 +217,11 @@ int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *
* \retval 0 success
* \retval -1 failure
*/
-int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+int __ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data,
+ const char *file, int line, const char *function)
attribute_warn_unused_result;
+#define ast_taskpool_push_wait(pool, task, data) \
+ __ast_taskpool_push_wait(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Shut down a taskpool and remove the underlying taskprocessors
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index 3e3886eb1e..6477fc382b 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -214,8 +214,10 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
* \retval -1 failure
* \since 1.6.1
*/
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
- attribute_warn_unused_result;
+int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap,
+ const char *file, int line, const char *function) attribute_warn_unused_result;
+#define ast_taskprocessor_push(tps, task_exe, datap) \
+ __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*! \brief Local data parameter */
struct ast_taskprocessor_local {
@@ -240,9 +242,11 @@ struct ast_taskprocessor_local {
* \retval -1 failure
* \since 12.0.0
*/
-int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
- int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
- attribute_warn_unused_result;
+int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
+ int (*task_exe)(struct ast_taskprocessor_local *local), void *datap,
+ const char *file, int line, const char *function) attribute_warn_unused_result;
+#define ast_taskprocessor_push_local(tps, task_exe, datap) \
+ __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Indicate the taskprocessor is suspended.
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index 72b2863c5d..2d759510cb 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -188,8 +188,11 @@ void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int siz
* \retval 0 success
* \retval -1 failure
*/
-int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
- attribute_warn_unused_result;
+int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data,
+ const char *file, int line, const char *function) attribute_warn_unused_result;
+
+#define ast_threadpool_push(pool, task, data) \
+ __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Shut down a threadpool and destroy it
diff --git a/main/taskpool.c b/main/taskpool.c
index 59ac4b0c72..c1d92a7a12 100644
--- a/main/taskpool.c
+++ b/main/taskpool.c
@@ -519,7 +519,13 @@ static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpoo
}
}
-int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+#undef ast_taskpool_push
+#define ast_taskpool_push_internal(pool, task, data) \
+ __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data);
+
+int __ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data,
+ const char *file, int line, const char *function)
{
RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
@@ -555,13 +561,19 @@ int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *
return -1;
}
- if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) {
+ if (__ast_taskprocessor_push(taskprocessor->taskprocessor, task, data, file, line, function)) {
return -1;
}
return 0;
}
+/* ABI compatibility: Provide actual function symbol for external modules */
+int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+{
+ return __ast_taskpool_push(pool, task, data, NULL, 0, NULL);
+}
+
/*!
* \internal Structure used for synchronous task
*/
@@ -620,7 +632,8 @@ static int taskpool_sync_task(void *data)
return ret;
}
-int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+int __ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data,
+ const char *file, int line, const char *function)
{
struct taskpool_sync_task sync_task;
@@ -635,7 +648,7 @@ int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), v
return -1;
}
- if (ast_taskpool_push(pool, taskpool_sync_task, &sync_task)) {
+ if (__ast_taskpool_push(pool, taskpool_sync_task, &sync_task, file, line, function)) {
taskpool_sync_task_cleanup(&sync_task);
return -1;
}
@@ -650,6 +663,15 @@ int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), v
return sync_task.fail;
}
+/* ABI compatibility: Provide actual function symbol for external modules */
+#undef ast_taskpool_push_wait
+int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data);
+
+int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+{
+ return __ast_taskpool_push_wait(pool, task, data, NULL, 0, NULL);
+}
+
void ast_taskpool_shutdown(struct ast_taskpool *pool)
{
if (!pool) {
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index c4015279c5..8ba6e0f6c2 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -55,6 +55,10 @@ struct tps_task {
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
unsigned int wants_local:1;
+ /*! \brief Debug information about where the task was pushed from */
+ const char *file;
+ int line;
+ const char *function;
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -63,6 +67,16 @@ struct tps_taskprocessor_stats {
unsigned long max_qsize;
/*! \brief This is the current number of tasks processed */
unsigned long _tasks_processed_count;
+ /*! \brief Highest time (in microseconds) spent processing a task */
+ long highest_time_processed;
+ /*! \brief Lowest time (in microseconds) spent processing a task */
+ long lowest_time_processed;
+ /*! \brief File where the highest time task was pushed from */
+ const char *highest_time_task_file;
+ /*! \brief Line where the highest time task was pushed from */
+ int highest_time_task_line;
+ /*! \brief Function where the highest time task was pushed from */
+ const char *highest_time_task_function;
};
/*! \brief A ast_taskprocessor structure is a singleton by name */
@@ -155,6 +169,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags);
@@ -162,6 +177,7 @@ static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags);
static struct ast_cli_entry taskprocessor_clis[] = {
AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
+ AST_CLI_DEFINE(cli_tps_show_taskprocessor, "Display detailed info about a taskprocessor"),
AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
AST_CLI_DEFINE(cli_tps_reset_stats, "Reset a named task processor's stats"),
AST_CLI_DEFINE(cli_tps_reset_stats_all, "Reset all task processors' stats"),
@@ -189,6 +205,17 @@ static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listene
listener->user_data = NULL;
}
+/* Keeping the old symbols for ABI compatibility */
+#undef ast_taskprocessor_push
+#define ast_taskprocessor_push_internal(tps, task_exe, datap) \
+ __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+
+#undef ast_taskprocessor_push_local
+#define ast_taskprocessor_push_local_internal(tps, task_exe, datap) \
+ __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap);
+
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
@@ -204,8 +231,8 @@ static void *default_tps_processing_function(void *data)
while (!pvt->dead) {
res = ast_sem_wait(&pvt->sem);
if (res != 0 && errno != EINTR) {
- ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
- strerror(errno));
+ ast_log(LOG_ERROR, "Taskprocessor '%s': Semaphore wait failed: %s\n",
+ tps->name, strerror(errno));
/* Just give up */
break;
}
@@ -238,8 +265,8 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
if (ast_sem_post(&pvt->sem) != 0) {
- ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
- strerror(errno));
+ ast_log(LOG_ERROR, "Taskprocessor '%s': Failed to signal task enqueue: %s\n",
+ listener->tps->name, strerror(errno));
}
}
@@ -258,7 +285,7 @@ static void default_listener_shutdown(struct ast_taskprocessor_listener *listene
/* Hold a reference during shutdown */
ao2_t_ref(listener->tps, +1, "tps-shutdown");
- if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
+ if (ast_taskprocessor_push_internal(listener->tps, default_listener_die, pvt)) {
/* This will cause the thread to exit early without completing tasks already
* in the queue. This is probably the least bad option in this situation. */
default_listener_die(pvt);
@@ -312,7 +339,7 @@ static void tps_shutdown(void)
objcount = ao2_container_count(tps_singletons);
if (objcount > 0) {
ast_log(LOG_DEBUG,
- "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
+ "Taskprocessor shutdown: Waiting for %d taskprocessor(s) to complete.\n",
objcount);
/* give the running taskprocessors a chance to finish, up to
@@ -327,8 +354,8 @@ static void tps_shutdown(void)
delay.tv_sec = 1;
delay.tv_nsec = 0;
ast_log(LOG_DEBUG,
- "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
- objcount);
+ "Taskprocessor shutdown: Still waiting for %d taskprocessor(s) after %d second(s).\n",
+ objcount, tries + 1);
}
}
@@ -336,17 +363,19 @@ static void tps_shutdown(void)
* a taskprocessor was not cleaned up somewhere */
if (objcount > 0) {
ast_log(LOG_ERROR,
- "Assertion may occur, the following taskprocessors are still running:\n");
+ "Taskprocessor shutdown: %d taskprocessor(s) still running after %d seconds. Assertion may occur:\n",
+ objcount, AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT);
sorted_tps = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, tps_sort_cb,
NULL);
if (!sorted_tps || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
- ast_log(LOG_ERROR, "unable to get sorted list of taskprocessors");
+ ast_log(LOG_ERROR, "Unable to get sorted list of taskprocessors for shutdown report\n");
}
else {
iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
while ((tps = ao2_iterator_next(&iter))) {
- ast_log(LOG_ERROR, "taskprocessor '%s'\n", tps->name);
+ ast_log(LOG_ERROR, " - Taskprocessor '%s' (queue size: %ld)\n",
+ tps->name, tps->tps_queue_size);
}
}
@@ -354,7 +383,7 @@ static void tps_shutdown(void)
}
else {
ast_log(LOG_DEBUG,
- "All waiting taskprocessors cleared!\n");
+ "Taskprocessor shutdown: All taskprocessors completed successfully.\n");
}
ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@@ -370,13 +399,13 @@ int ast_tps_init(void)
tps_singletons = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
TPS_MAX_BUCKETS, tps_hash_cb, NULL, tps_cmp_cb);
if (!tps_singletons) {
- ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
+ ast_log(LOG_ERROR, "Failed to initialize taskprocessor container!\n");
return -1;
}
if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
ao2_ref(tps_singletons, -1);
- ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
+ ast_log(LOG_ERROR, "Failed to initialize taskprocessor subsystems tracking vector!\n");
return -1;
}
@@ -390,43 +419,51 @@ int ast_tps_init(void)
}
/* allocate resources for the task */
-static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
+static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap,
+ const char *file, int line, const char *function)
{
struct tps_task *t;
if (!task_exe) {
- ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ ast_log(LOG_ERROR, "Task callback function is NULL!\n");
return NULL;
}
t = ast_calloc(1, sizeof(*t));
if (!t) {
- ast_log(LOG_ERROR, "failed to allocate task!\n");
+ ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
return NULL;
}
t->callback.execute = task_exe;
t->datap = datap;
+ t->file = file;
+ t->line = line;
+ t->function = function;
return t;
}
-static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap,
+ const char *file, int line, const char *function)
{
struct tps_task *t;
if (!task_exe) {
- ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ ast_log(LOG_ERROR, "Task callback function is NULL!\n");
return NULL;
}
t = ast_calloc(1, sizeof(*t));
if (!t) {
- ast_log(LOG_ERROR, "failed to allocate task!\n");
+ ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
return NULL;
}
t->callback.execute_local = task_exe;
t->datap = datap;
t->wants_local = 1;
+ t->file = file;
+ t->line = line;
+ t->function = function;
return t;
}
@@ -520,7 +557,7 @@ static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
ts.tv_nsec = when.tv_usec * 1000;
ast_mutex_lock(&cli_ping_cond_lock);
- if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
+ if (ast_taskprocessor_push_internal(tps, tps_ping_handler, 0) < 0) {
ast_mutex_unlock(&cli_ping_cond_lock);
ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
ast_taskprocessor_unreference(tps);
@@ -574,8 +611,8 @@ static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
return cmp;
}
-#define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s\n"
-#define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu\n"
+#define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s %10s %10s\n"
+#define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu %10ld %10ld\n"
/*!
* \internal
@@ -589,7 +626,8 @@ static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocesso
{
ast_cli(fd, FMT_FIELDS, tps->name, tps->stats._tasks_processed_count,
tps->tps_queue_size, tps->stats.max_qsize, tps->tps_queue_low,
- tps->tps_queue_high);
+ tps->tps_queue_high, tps->stats.lowest_time_processed,
+ tps->stats.highest_time_processed);
}
/*!
@@ -667,12 +705,94 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_SHOWUSAGE;
}
- ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
+ ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water", "Low time(us)", "High time(us)");
ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
return CLI_SUCCESS;
}
+static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ const char *name;
+ struct ast_taskprocessor *tps;
+ struct tps_task *task;
+ int task_count = 0;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "core show taskprocessor name";
+ e->usage =
+ "Usage: core show taskprocessor name <taskprocessor>\n"
+ " Displays detailed information about a specific taskprocessor,\n"
+ " including all queued tasks and their origins (DEVMODE only).\n";
+ return NULL;
+ case CLI_GENERATE:
+ if (a->pos == 4) {
+ return tps_taskprocessor_tab_complete(a);
+ }
+ return NULL;
+ }
+
+ if (a->argc != 5) {
+ return CLI_SHOWUSAGE;
+ }
+
+ name = a->argv[4];
+ tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS);
+ if (!tps) {
+ ast_cli(a->fd, "\nTaskprocessor '%s' not found\n\n", name);
+ return CLI_SUCCESS;
+ }
+
+ ao2_lock(tps);
+
+ ast_cli(a->fd, "\nTaskprocessor: %s\n", tps->name);
+ ast_cli(a->fd, "===========================================\n");
+ ast_cli(a->fd, "Subsystem: %s\n", tps->subsystem[0] ? tps->subsystem : "(none)");
+ ast_cli(a->fd, "Tasks processed: %lu\n", tps->stats._tasks_processed_count);
+ ast_cli(a->fd, "Current queue size: %ld\n", tps->tps_queue_size);
+ ast_cli(a->fd, "Max queue depth: %lu\n", tps->stats.max_qsize);
+ ast_cli(a->fd, "Low water mark: %ld\n", tps->tps_queue_low);
+ ast_cli(a->fd, "High water mark: %ld\n", tps->tps_queue_high);
+ ast_cli(a->fd, "High water alert: %s\n", tps->high_water_alert ? "Yes" : "No");
+ ast_cli(a->fd, "Suspended: %s\n", tps->suspended ? "Yes" : "No");
+ ast_cli(a->fd, "Currently executing: %s\n", tps->executing ? "Yes" : "No");
+ ast_cli(a->fd, "Highest time (us): %ld\n", tps->stats.highest_time_processed);
+ if (tps->stats.highest_time_task_file) {
+ ast_cli(a->fd, " Highest task origin: %s:%d (%s)\n",
+ tps->stats.highest_time_task_file,
+ tps->stats.highest_time_task_line,
+ tps->stats.highest_time_task_function);
+ }
+ ast_cli(a->fd, "Lowest time (us): %ld\n", tps->stats.lowest_time_processed);
+
+ if (tps->tps_queue_size > 0) {
+ ast_cli(a->fd, "\nQueued Tasks:\n");
+ ast_cli(a->fd, "-------------------------------------------\n");
+
+ AST_LIST_TRAVERSE(&tps->tps_queue, task, list) {
+ task_count++;
+ if (task->file) {
+ ast_cli(a->fd, " Task #%d:\n", task_count);
+ ast_cli(a->fd, " Origin: %s:%d\n", task->file, task->line);
+ ast_cli(a->fd, " Function: %s\n", task->function);
+ ast_cli(a->fd, " Type: %s\n", task->wants_local ? "Local" : "Standard");
+ } else {
+ ast_cli(a->fd, " Task #%d: (origin not available)\n", task_count);
+ }
+ }
+ ast_cli(a->fd, "\nTotal queued tasks: %d\n", task_count);
+ } else {
+ ast_cli(a->fd, "\nNo tasks currently queued.\n");
+ }
+
+ ao2_unlock(tps);
+ ast_taskprocessor_unreference(tps);
+
+ ast_cli(a->fd, "\n");
+ return CLI_SUCCESS;
+}
+
/* hash callback for astobj2 */
static int tps_hash_cb(const void *obj, const int flags)
{
@@ -866,8 +986,8 @@ static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
if (DEBUG_ATLEAST(3)
/* and tps_alert_count becomes zero or non-zero */
&& !old != !tps_alert_count) {
- ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
- tps->name, tps_alert_count ? "triggered" : "cleared");
+ ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert (total alerts: %u).\n",
+ tps->name, tps_alert_count ? "triggered" : "cleared", tps_alert_count);
}
if (tps->subsystem[0] != '\0') {
@@ -1028,11 +1148,12 @@ static void *default_listener_pvt_alloc(void)
pvt = ast_calloc(1, sizeof(*pvt));
if (!pvt) {
+ ast_log(LOG_ERROR, "Failed to allocate memory for taskprocessor listener\n");
return NULL;
}
pvt->poll_thread = AST_PTHREADT_NULL;
if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
- ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+ ast_log(LOG_ERROR, "Failed to initialize taskprocessor semaphore: %s\n", strerror(errno));
ast_free(pvt);
return NULL;
}
@@ -1098,7 +1219,7 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
static struct ast_taskprocessor *__start_taskprocessor(struct ast_taskprocessor *p)
{
if (p && p->listener->callbacks->start(p->listener)) {
- ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
+ ast_log(LOG_ERROR, "Failed to start taskprocessor listener for '%s'\n",
p->name);
ast_taskprocessor_unreference(p);
@@ -1118,7 +1239,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
struct default_taskprocessor_listener_pvt *pvt;
if (ast_strlen_zero(name)) {
- ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
+ ast_log(LOG_ERROR, "Cannot get taskprocessor with empty name!\n");
return NULL;
}
ao2_lock(tps_singletons);
@@ -1212,23 +1333,28 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
int was_empty;
if (!tps) {
- ast_log(LOG_ERROR, "tps is NULL!\n");
+ ast_log(LOG_ERROR, "Taskprocessor is NULL!\n");
return -1;
}
if (!t) {
- ast_log(LOG_ERROR, "t is NULL!\n");
+ ast_log(LOG_ERROR, "Task is NULL!\n");
return -1;
}
+ if (t->file) {
+ ast_debug(3, "Taskprocessor '%s': Task pushed from %s:%d (%s)\n",
+ tps->name, t->file, t->line, t->function);
+ }
+
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
if (tps->tps_queue_high <= tps->tps_queue_size) {
if (!tps->high_water_alert) {
- ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
- tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
+ ast_log(LOG_WARNING, "Taskprocessor '%s' queue reached %ld scheduled tasks (high water mark: %ld)%s.\n",
+ tps->name, tps->tps_queue_size, tps->tps_queue_high, tps->high_water_warned ? " again" : "");
tps->high_water_warned = 1;
tps->high_water_alert = 1;
tps_alert_add(tps, +1);
@@ -1242,14 +1368,26 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
return 0;
}
+int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap,
+ const char *file, int line, const char *function)
+{
+ return taskprocessor_push(tps, tps_task_alloc(task_exe, datap, file, line, function));
+}
+
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
{
- return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+ return __ast_taskprocessor_push(tps, task_exe, datap, NULL, 0, NULL);
+}
+
+int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap,
+ const char *file, int line, const char *function)
+{
+ return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap, file, line, function));
}
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
{
- return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+ return __ast_taskprocessor_push_local(tps, task_exe, datap, NULL, 0, NULL);
}
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
@@ -1284,6 +1422,11 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
struct ast_taskprocessor_local local;
struct tps_task *t;
long size;
+ struct timeval start;
+ long elapsed;
+ const char *task_file = NULL;
+ int task_line = 0;
+ const char *task_function = NULL;
ao2_lock(tps);
t = tps_taskprocessor_pop(tps);
@@ -1299,8 +1442,15 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
local.local_data = tps->local_data;
local.data = t->datap;
}
+
+ /* Save task origin info before we free the task */
+ task_file = t->file;
+ task_line = t->line;
+ task_function = t->function;
ao2_unlock(tps);
+ start = ast_tvnow();
+
if (t->wants_local) {
t->callback.execute_local(&local);
} else {
@@ -1324,6 +1474,18 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
if (size >= tps->stats.max_qsize) {
tps->stats.max_qsize = size + 1;
}
+
+ elapsed = ast_tvdiff_us(ast_tvnow(), start);
+ if (elapsed > tps->stats.highest_time_processed) {
+ tps->stats.highest_time_processed = elapsed;
+ tps->stats.highest_time_task_file = task_file;
+ tps->stats.highest_time_task_line = task_line;
+ tps->stats.highest_time_task_function = task_function;
+ }
+ if (elapsed < tps->stats.lowest_time_processed) {
+ tps->stats.lowest_time_processed = elapsed;
+ }
+
ao2_unlock(tps);
/* If we executed a task, check for the transition to empty */
@@ -1393,6 +1555,11 @@ static void tps_reset_stats(struct ast_taskprocessor *tps)
ao2_lock(tps);
tps->stats._tasks_processed_count = 0;
tps->stats.max_qsize = 0;
+ tps->stats.highest_time_processed = 0;
+ tps->stats.lowest_time_processed = 0;
+ tps->stats.highest_time_task_file = NULL;
+ tps->stats.highest_time_task_line = 0;
+ tps->stats.highest_time_task_function = NULL;
ao2_unlock(tps);
}
diff --git a/main/threadpool.c b/main/threadpool.c
index 0969e627c3..1a815782f9 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -608,8 +608,7 @@ static int queued_task_pushed(void *data)
* \param listener The taskprocessor listener. The threadpool is the listener's private data
* \param was_empty True if the taskprocessor was empty prior to the task being pushed
*/
-static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
- int was_empty)
+static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
struct task_pushed_data *tpd;
@@ -954,15 +953,27 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
return pool;
}
-int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
+#undef ast_threadpool_push
+#define ast_threadpool_push_internal(pool, task, data) \
+ __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data);
+
+int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data,
+ const char *file, int line, const char *function)
{
SCOPED_AO2LOCK(lock, pool);
if (!pool->shutting_down) {
- return ast_taskprocessor_push(pool->tps, task, data);
+ return __ast_taskprocessor_push(pool->tps, task, data, file, line, function);
}
return -1;
}
+/* ABI compatibility: Provide actual function symbol for external modules */
+int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
+{
+ return __ast_threadpool_push(pool, task, data, NULL, 0, NULL);
+}
+
void ast_threadpool_shutdown(struct ast_threadpool *pool)
{
if (!pool) {
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 5b954b2226..6e31084756 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -2096,13 +2096,48 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
return ast_sip_create_serializer_group(name, NULL);
}
-int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+#undef ast_sip_push_task
+int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+int __ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function)
{
if (!serializer) {
serializer = ast_serializer_pool_get(sip_serializer_pool);
}
- return ast_taskprocessor_push(serializer, sip_task, task_data);
+ return __ast_taskprocessor_push(serializer, sip_task, task_data, file, line, function);
+}
+
+/* ABI compatibility: Provide actual function symbol for external modules */
+int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+ return __ast_sip_push_task(serializer, sip_task, task_data, NULL, 0, NULL);
+}
+
+/* ABI compatibility: Provide actual function symbols for wait functions */
+#undef ast_sip_push_task_wait_servant
+int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+ return __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, NULL, 0, NULL);
+}
+
+#undef ast_sip_push_task_synchronous
+int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+ return __ast_sip_push_task_synchronous(serializer, sip_task, task_data, NULL, 0, NULL);
+}
+
+#undef ast_sip_push_task_wait_serializer
+int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+ return __ast_sip_push_task_wait_serializer(serializer, sip_task, task_data, NULL, 0, NULL);
}
struct sync_task_data {
@@ -2134,7 +2169,8 @@ static int sync_task(void *data)
return ret;
}
-static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+static int __ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function)
{
/* This method is an onion */
struct sync_task_data std;
@@ -2145,7 +2181,7 @@ static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*si
std.task = sip_task;
std.task_data = task_data;
- if (ast_sip_push_task(serializer, sync_task, &std)) {
+ if (__ast_sip_push_task(serializer, sync_task, &std, file, line, function)) {
ast_mutex_destroy(&std.lock);
ast_cond_destroy(&std.cond);
return -1;
@@ -2162,21 +2198,24 @@ static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*si
return std.fail;
}
-int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+int __ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function)
{
if (ast_sip_thread_is_servant()) {
return sip_task(task_data);
}
- return ast_sip_push_task_wait(serializer, sip_task, task_data);
+ return __ast_sip_push_task_wait(serializer, sip_task, task_data, file, line, function);
}
-int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+int __ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function)
{
- return ast_sip_push_task_wait_servant(serializer, sip_task, task_data);
+ return __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, file, line, function);
}
-int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+int __ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+ const char *file, int line, const char *function)
{
if (!serializer) {
/* Caller doesn't care which PJSIP serializer the task executes under. */
@@ -2195,7 +2234,7 @@ int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int
return sip_task(task_data);
}
- return ast_sip_push_task_wait(serializer, sip_task, task_data);
+ return __ast_sip_push_task_wait(serializer, sip_task, task_data, file, line, function);
}
void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size)
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 537f77c767..e2e827fb10 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -31,12 +31,15 @@
#include "asterisk.h"
+#include <unistd.h>
+
#include "asterisk/test.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/module.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/threadpool.h"
+#include "asterisk/cli.h"
/*!
* \brief userdata associated with baseline taskprocessor test
@@ -963,6 +966,170 @@ AST_TEST_DEFINE(serializer_pool)
return AST_TEST_PASS;
}
+/*!
+ * \brief Test for CLI command "core show taskprocessor <name>"
+ *
+ * This test creates a taskprocessor, queues tasks with controlled execution,
+ * and verifies that the CLI command displays the queued tasks correctly.
+ */
+AST_TEST_DEFINE(taskprocessor_cli_show)
+{
+ RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+ struct task_data *task_data1 = NULL;
+ struct task_data *task_data2 = NULL;
+ struct task_data *task_data3 = NULL;
+ int task_queued1 = 0, task_queued2 = 0, task_queued3 = 0;
+ char cli_command[128];
+ int cli_output_fd[2];
+ char output_buffer[4096] = {0};
+ ssize_t bytes_read;
+ int res = AST_TEST_FAIL;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "taskprocessor_cli_show";
+ info->category = "/main/taskprocessor/";
+ info->summary = "Test CLI command 'core show taskprocessor'";
+ info->description =
+ "Verifies that the 'core show taskprocessor <name>' CLI command\n"
+ "displays taskprocessor information and queued tasks correctly.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ /* Create a pipe to capture CLI output */
+ if (pipe(cli_output_fd) != 0) {
+ ast_test_status_update(test, "Failed to create pipe for CLI output\n");
+ return AST_TEST_FAIL;
+ }
+
+ /* Create taskprocessor */
+ tps = ast_taskprocessor_get("test_cli_taskprocessor", TPS_REF_DEFAULT);
+ if (!tps) {
+ ast_test_status_update(test, "Unable to create test taskprocessor\n");
+ close(cli_output_fd[0]);
+ close(cli_output_fd[1]);
+ return AST_TEST_FAIL;
+ }
+
+ /* Create tasks that will wait so they stay in the queue */
+ task_data1 = task_data_create();
+ task_data2 = task_data_create();
+ task_data3 = task_data_create();
+
+ if (!task_data1 || !task_data2 || !task_data3) {
+ ast_test_status_update(test, "Unable to create task_data\n");
+ goto cleanup;
+ }
+
+ /* Set a long wait time so tasks stay queued */
+ task_data1->wait_time = 2000; /* 2 seconds */
+ task_data2->wait_time = 2000;
+ task_data3->wait_time = 2000;
+
+ /* Queue the tasks */
+ if (ast_taskprocessor_push(tps, task, task_data1)) {
+ ast_test_status_update(test, "Failed to queue task 1\n");
+ goto cleanup;
+ }
+ task_queued1 = 1;
+
+ if (ast_taskprocessor_push(tps, task, task_data2)) {
+ ast_test_status_update(test, "Failed to queue task 2\n");
+ goto cleanup;
+ }
+ task_queued2 = 1;
+
+ if (ast_taskprocessor_push(tps, task, task_data3)) {
+ ast_test_status_update(test, "Failed to queue task 3\n");
+ goto cleanup;
+ }
+ task_queued3 = 1;
+
+ /* Execute the CLI command */
+ snprintf(cli_command, sizeof(cli_command), "core show taskprocessor name test_cli_taskprocessor");
+
+ if (ast_cli_command(cli_output_fd[1], cli_command) != 0) {
+ ast_test_status_update(test, "CLI command execution failed\n");
+ goto cleanup;
+ }
+
+ /* Close write end and read the output */
+ close(cli_output_fd[1]);
+ cli_output_fd[1] = -1;
+
+ bytes_read = read(cli_output_fd[0], output_buffer, sizeof(output_buffer) - 1);
+ if (bytes_read <= 0) {
+ ast_test_status_update(test, "Failed to read CLI output\n");
+ goto cleanup;
+ }
+ output_buffer[bytes_read] = '\0';
+
+ /* Log the output for inspection */
+ ast_test_status_update(test, "CLI Output:\n%s\n", output_buffer);
+
+ /* Verify the output contains expected information */
+ if (!strstr(output_buffer, "test_cli_taskprocessor")) {
+ ast_test_status_update(test, "Output missing taskprocessor name\n");
+ goto cleanup;
+ }
+
+ if (!strstr(output_buffer, "Current queue size")) {
+ ast_test_status_update(test, "Output missing queue size information\n");
+ goto cleanup;
+ }
+
+ /* Check for queued tasks section (at least one task should be shown) */
+ if (!strstr(output_buffer, "Queued Tasks") && !strstr(output_buffer, "Currently executing")) {
+ ast_test_status_update(test, "Output missing queued tasks or execution status\n");
+ goto cleanup;
+ }
+
+ /* Verify we see task information */
+ if (!strstr(output_buffer, "Task #")) {
+ ast_test_status_update(test, "Output missing task information\n");
+ goto cleanup;
+ }
+
+ ast_test_status_update(test, "CLI command output validated successfully\n");
+ res = AST_TEST_PASS;
+
+cleanup:
+
+ ast_test_status_update(test, "Waiting for tasks to complete\n");
+
+ /* Wait for tasks to complete */
+ if (task_data1) {
+ if (task_queued1) {
+ task_wait(task_data1);
+ }
+ ao2_cleanup(task_data1);
+ }
+ if (task_data2) {
+ if (task_queued2) {
+ task_wait(task_data2);
+ }
+ ao2_cleanup(task_data2);
+ }
+ if (task_data3) {
+ if (task_queued3) {
+ task_wait(task_data3);
+ }
+ ao2_cleanup(task_data3);
+ }
+
+ if (cli_output_fd[0] >= 0) {
+ close(cli_output_fd[0]);
+ }
+ if (cli_output_fd[1] >= 0) {
+ close(cli_output_fd[1]);
+ }
+
+ ast_test_status_update(test, "Tasks complete\n");
+ return res;
+}
+
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
@@ -972,6 +1139,7 @@ static int unload_module(void)
ast_test_unregister(taskprocessor_shutdown);
ast_test_unregister(taskprocessor_push_local);
ast_test_unregister(serializer_pool);
+ ast_test_unregister(taskprocessor_cli_show);
return 0;
}
@@ -984,6 +1152,7 @@ static int load_module(void)
ast_test_register(taskprocessor_shutdown);
ast_test_register(taskprocessor_push_local);
ast_test_register(serializer_pool);
+ ast_test_register(taskprocessor_cli_show);
return AST_MODULE_LOAD_SUCCESS;
}