FFmpegfs Fuse Multi Media Filesystem 2.19
Loading...
Searching...
No Matches
thread_pool.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 2019-2026 Norbert Schlia (nschlia@oblivion-software.de)
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
17 *
18 * On Debian systems, the complete text of the GNU General Public License
19 * Version 3 can be found in `/usr/share/common-licenses/GPL-3'.
20 */
21
32#include "thread_pool.h"
33#include "logging.h"
34#include "config.h"
35
36thread_pool::thread_pool(unsigned int num_threads)
37 : m_queue_shutdown(false)
38 , m_num_threads(num_threads)
39 , m_cur_threads(0)
40 , m_threads_running(0)
41{
42}
43
48
50{
51 tp.loop_function();
52}
53
55{
56 unsigned int thread_no = ++m_cur_threads;
57
58 while (true)
59 {
60 FunctionPointer info;
61 {
62 std::unique_lock<std::mutex> lock_queue_mutex(m_queue_mutex);
63 m_queue_cond.wait(lock_queue_mutex, [this]{ return (!m_thread_queue.empty() || m_queue_shutdown); });
64
66 {
67 break;
68 }
69
70 Logging::trace(nullptr, "Starting job taking pool thread no. %1 with id 0x%<" FFMPEGFS_FORMAT_PTHREAD_T ">2.", thread_no, pthread_self());
71 info = m_thread_queue.front();
72 m_thread_queue.pop();
73 }
74
75 int ret = info();
76
77 Logging::trace(nullptr, "The job using pool thread no. %1 with id 0x%<" FFMPEGFS_FORMAT_PTHREAD_T ">2 has exited with return code %3.", thread_no, pthread_self(), ret);
78 }
79}
80
82{
84 {
85 Logging::trace(nullptr, "Queuing up a new thread. There are %1 threads in the pool.", m_thread_pool.size());
86
87 {
88 std::lock_guard<std::mutex> lock_queue_mutex(m_queue_mutex);
89
90 m_thread_queue.push(func);
91 }
92
93 m_queue_cond.notify_one();
94
95 return true;
96 }
97 else
98 {
99 return false;
100 }
101}
102
104{
105 return m_threads_running;
106}
107
109{
110 std::lock_guard<std::mutex> lock_queue_mutex(m_queue_mutex);
111
112 return static_cast<unsigned int>(m_thread_queue.size());
113}
114
115unsigned int thread_pool::pool_size() const
116{
117 return static_cast<unsigned int>(m_thread_pool.size());
118}
119
120int thread_pool::init(unsigned int num_threads /*= 0*/)
121{
122 if (!m_thread_pool.empty())
123 {
124 Logging::warning(nullptr, "The thread pool already initialised");
125 return 0;
126 }
127
128 if (num_threads)
129 {
130 m_num_threads = num_threads;
131 }
132
133 Logging::info(nullptr, "The thread pool is being initialised with a maximum of %1 threads.", m_num_threads);
134
135 for(unsigned int i = 0; i < m_num_threads; i++)
136 {
137 m_thread_pool.emplace_back(std::thread(&thread_pool::loop_function_starter, std::ref(*this)));
138 }
139
140 return static_cast<int>(m_thread_pool.size());
141}
142
143void thread_pool::tear_down(bool silent)
144{
145 if (!silent)
146 {
147 Logging::debug(nullptr, "Tearing down the thread pool. There are %1 threads still in the pool.", m_thread_queue.size());
148 }
149
150 {
151 std::lock_guard<std::mutex> lk(m_queue_mutex);
152 m_queue_shutdown = true;
153 }
154 m_queue_cond.notify_all();
155
156 while (!m_thread_pool.empty())
157 {
158 m_thread_pool.back().join();
159 m_thread_pool.pop_back();
160 }
161}
162
static void warning(const T filename, const std::string &format_string, Args &&...args)
Write warning level log entry.
Definition logging.h:220
static void debug(const T filename, const std::string &format_string, Args &&...args)
Write debug level log entry.
Definition logging.h:182
static void trace(const T filename, const std::string &format_string, Args &&...args)
Write trace level log entry.
Definition logging.h:163
static void info(const T filename, const std::string &format_string, Args &&...args)
Write info level log entry.
Definition logging.h:201
The thread_pool class.
Definition thread_pool.h:55
int init(unsigned int num_threads=0)
Initialise thread pool. Initialise the thread pool. Does nothing if called more than once.
std::condition_variable m_queue_cond
unsigned int pool_size() const
Get current pool size.
unsigned int m_num_threads
std::function< int(void)> FunctionPointer
Pointer to thread pool function.
Definition thread_pool.h:57
void loop_function()
Start loop function.
std::mutex m_queue_mutex
std::vector< std::thread > m_thread_pool
std::queue< FunctionPointer > m_thread_queue
static void loop_function_starter(thread_pool &tp)
Start loop function.
void tear_down(bool silent=false)
Shut down the thread pool.
unsigned int m_cur_threads
bool schedule_thread(FunctionPointer &&func)
Schedule a new thread from pool.
virtual ~thread_pool()
Object destructor. Ends all threads and cleans up resources.
std::atomic_uint32_t m_threads_running
unsigned int current_queued()
Get number of currently queued threads.
thread_pool(unsigned int num_threads=std::thread::hardware_concurrency() *4)
Construct a thread_pool object.
std::atomic_bool m_queue_shutdown
unsigned int current_running() const
Get number of currently running threads.
std::unique_ptr< thread_pool > tp
Thread pool object.
Definition fuseops.cc:102
Provide various log facilities to stderr, disk or syslog.
Thread pool class implementation.