FFmpegfs Fuse Multi Media Filesystem 2.16
thread_pool.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 2019-2024 Norbert Schlia (nschlia@oblivion-software.de)
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
17 *
18 * On Debian systems, the complete text of the GNU General Public License
19 * Version 3 can be found in `/usr/share/common-licenses/GPL-3'.
20 */
21
32#include "thread_pool.h"
33#include "logging.h"
34#include "config.h"
35
36thread_pool::thread_pool(unsigned int num_threads)
37 : m_queue_shutdown(false)
38 , m_num_threads(num_threads)
39 , m_cur_threads(0)
40 , m_threads_running(0)
41{
42}
43
45{
46 tear_down(true);
47}
48
50{
51 tp.loop_function();
52}
53
55{
56 unsigned int thread_no = ++m_cur_threads;
57
58 while (true)
59 {
60 FunctionPointer info;
61 {
62 std::unique_lock<std::mutex> lock_queue_mutex(m_queue_mutex);
63 m_queue_cond.wait(lock_queue_mutex, [this]{ return (!m_thread_queue.empty() || m_queue_shutdown); });
64
66 {
67 lock_queue_mutex.unlock();
68 break;
69 }
70
71 Logging::trace(nullptr, "Starting job taking pool thread no. %1 with id 0x%<" FFMPEGFS_FORMAT_PTHREAD_T ">2.", thread_no, pthread_self());
72 info = m_thread_queue.front();
73 m_thread_queue.pop();
74 }
75
76 int ret = info();
77
78 Logging::trace(nullptr, "The job using pool thread no. %1 with id 0x%<" FFMPEGFS_FORMAT_PTHREAD_T ">2 has exited with return code %3.", thread_no, pthread_self(), ret);
79 }
80}
81
83{
85 {
86 Logging::trace(nullptr, "Queuing up a new thread. There are %1 threads in the pool.", m_thread_pool.size());
87
88 {
89 std::lock_guard<std::mutex> lock_queue_mutex(m_queue_mutex);
90
91 m_thread_queue.push(func);
92 }
93
94 m_queue_cond.notify_one();
95
96 return true;
97 }
98 else
99 {
100 return false;
101 }
102}
103
105{
106 return m_threads_running;
107}
108
110{
111 std::lock_guard<std::mutex> lock_queue_mutex(m_queue_mutex);
112
113 return static_cast<unsigned int>(m_thread_queue.size());
114}
115
116unsigned int thread_pool::pool_size() const
117{
118 return static_cast<unsigned int>(m_thread_pool.size());
119}
120
121int thread_pool::init(unsigned int num_threads /*= 0*/)
122{
123 if (!m_thread_pool.empty())
124 {
125 Logging::warning(nullptr, "The thread pool already initialised");
126 return 0;
127 }
128
129 if (num_threads)
130 {
131 m_num_threads = num_threads;
132 }
133
134 Logging::info(nullptr, "The thread pool is being initialised with a maximum of %1 threads.", m_num_threads);
135
136 for(unsigned int i = 0; i < m_num_threads; i++)
137 {
138 m_thread_pool.emplace_back(std::thread(&thread_pool::loop_function_starter, std::ref(*this)));
139 }
140
141 return static_cast<int>(m_thread_pool.size());
142}
143
144void thread_pool::tear_down(bool silent)
145{
146 if (!silent)
147 {
148 Logging::debug(nullptr, "Tearing down the thread pool. There are %1 threads still in the pool.", m_thread_queue.size());
149 }
150
151 m_queue_mutex.lock();
152 m_queue_shutdown = true;
153 m_queue_mutex.unlock();
154 m_queue_cond.notify_all();
155
156 while (!m_thread_pool.empty())
157 {
158 m_thread_pool.back().join();
159 m_thread_pool.pop_back();
160 }
161}
162
static void warning(const T filename, const std::string &format_string, Args &&...args)
Write warning level log entry.
Definition: logging.h:220
static void debug(const T filename, const std::string &format_string, Args &&...args)
Write debug level log entry.
Definition: logging.h:182
static void trace(const T filename, const std::string &format_string, Args &&...args)
Write trace level log entry.
Definition: logging.h:163
static void info(const T filename, const std::string &format_string, Args &&...args)
Write info level log entry.
Definition: logging.h:201
The thread_pool class.
Definition: thread_pool.h:55
int init(unsigned int num_threads=0)
Initialise thread pool. Initialise the thread pool. Does nothing if called more than once.
Definition: thread_pool.cc:121
std::condition_variable m_queue_cond
Definition: thread_pool.h:118
unsigned int pool_size() const
Get current pool size.
Definition: thread_pool.cc:116
unsigned int m_num_threads
Definition: thread_pool.h:121
std::function< int(void)> FunctionPointer
Pointer to thread pool function.
Definition: thread_pool.h:57
void loop_function()
Start loop function.
Definition: thread_pool.cc:54
std::mutex m_queue_mutex
Definition: thread_pool.h:117
std::vector< std::thread > m_thread_pool
Definition: thread_pool.h:116
std::queue< FunctionPointer > m_thread_queue
Definition: thread_pool.h:119
static void loop_function_starter(thread_pool &tp)
Start loop function.
Definition: thread_pool.cc:49
void tear_down(bool silent=false)
Shut down the thread pool.
Definition: thread_pool.cc:144
unsigned int m_cur_threads
Definition: thread_pool.h:122
bool schedule_thread(FunctionPointer &&func)
Schedule a new thread from pool.
Definition: thread_pool.cc:82
virtual ~thread_pool()
Object destructor. Ends all threads and cleans up resources.
Definition: thread_pool.cc:44
std::atomic_uint32_t m_threads_running
Definition: thread_pool.h:123
unsigned int current_queued()
Get number of currently queued threads.
Definition: thread_pool.cc:109
thread_pool(unsigned int num_threads=std::thread::hardware_concurrency() *4)
Construct a thread_pool object.
Definition: thread_pool.cc:36
std::atomic_bool m_queue_shutdown
Definition: thread_pool.h:120
unsigned int current_running() const
Get number of currently running threads.
Definition: thread_pool.cc:104
std::unique_ptr< thread_pool > tp
Thread pool object.
Definition: fuseops.cc:102
Provide various log facilities to stderr, disk or syslog.
Thread pool class implementation.