winix/winixd/core/threadmanager.cpp

208 lines
5.1 KiB
C++

/*
* This file is a part of Winix
* and is distributed under the 2-Clause BSD licence.
* Author: Tomasz Sowa <t.sowa@ttmath.org>
*/
/*
* Copyright (c) 2011-2019, Tomasz Sowa
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <signal.h>
#include "threadmanager.h"
#include "log.h"
namespace Winix
{
ThreadManager::ThreadManager()
{
were_started = false;
}
//void ThreadManager::SetSynchro(Synchro * psynchro)
//{
// synchro = psynchro;
//}
void ThreadManager::Init()
{
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGTERM);
sigaddset(&set, SIGINT);
// blocking SIGTERM and SIGINT
// new threads will have the signals blocked too
pthread_sigmask(SIG_BLOCK, &set, 0);
}
void ThreadManager::Add(BaseThread * pbase, const wchar_t * thread_name)
{
thread_tab.emplace_back();
ThreadItem & item = thread_tab.back();
item.thread_item_data = new ThreadItemData();
ThreadItemData & data = *item.thread_item_data;
item.object = pbase;
item.name = thread_name;
item.object->set_dependency(this);
// main log buffer (from the main thread)
item.object->set_main_log_buffer(log.GetLogBuffer());
item.object->set_main_file_log(log.GetFileLog());
// the logger buffer and model_connector are different
item.object->set_log_buffer(&data.log_buffer);
//data.postgresql_connector.set_logger(logger);
data.postgresql_connector.set_conn_param(config->db_database, config->db_user, config->db_pass);
data.postgresql_connector.set_logger(item.object->get_logger());
data.postgresql_connector.set_log_queries(config->log_db_query);
data.postgresql_connector.wait_for_connection();
data.model_connector.set_db_connector(data.postgresql_connector);
data.model_connector.set_flat_connector(data.json_connector);
data.model_connector.set_winix_config(config);
data.model_connector.set_winix_logger(item.object->get_logger());
data.model_connector.set_logger(item.object->get_logger());
//data.model_connector.set_winix_request();
item.object->set_model_connector(&data.model_connector);
if( were_started )
{
Start(thread_tab.size() - 1, &thread_tab.back());
}
else
{
log << log4 << "TM: added a thread to the queue, number: " << (thread_tab.size()-1)
<< ", name: " << thread_name << logend;
}
}
void ThreadManager::Add(BaseThread & pbase, const wchar_t * thread_name)
{
Add(&pbase, thread_name);
}
void ThreadManager::Add(BaseThread * pbase, const std::wstring & thread_name)
{
Add(pbase, thread_name.c_str());
}
void ThreadManager::Add(BaseThread & pbase, const std::wstring & thread_name)
{
Add(&pbase, thread_name.c_str());
}
void ThreadManager::StartAll()
{
Winix::Lock lock(synchro);
int id = 0;
for(ThreadItem & item : thread_tab)
{
Start(id, &item);
id += 1;
}
lock.Unlock();
were_started = true;
}
void ThreadManager::Start(int id, ThreadItem * item)
{
if( item->object->StartThread() )
{
log << log4 << "TM: thread " << id << " (" << item->object->ThreadId() << ", name: "
<< item->name << ") started" << logend;
}
else
{
log << log4 << "TM: cannot run a thread " << id << ", name: " << item->name << logend;
}
}
void ThreadManager::StopAll()
{
if( !were_started )
return;
{
Winix::Lock lock(synchro);
for(ThreadItem & item : thread_tab)
{
// WakeUpThread() should be used with Lock/Unlock
item.object->WakeUpThread();
}
}
int id = 0;
for(ThreadItem & item : thread_tab)
{
log << log4 << "TM: waiting for thread " << id << " (" << item.object->ThreadId()
<< ", name: " << item.name << ")" << logend << logsave;
item.object->WaitForThread();
log << log4 << "TM: thread " << id << " terminated" << logend << logsave;
// the thread is stopped and we can set the thread log buffer pointing to
// the main log buffer (from the main thread)
item.object->set_log_buffer(log.GetLogBuffer());
delete item.thread_item_data;
item.thread_item_data = nullptr;
id += 1;
}
thread_tab.clear();
}
} // namespace Winix