winix/winixd/core/job.cpp

410 lines
9.0 KiB
C++

/*
* This file is a part of Winix
* and is distributed under the 2-Clause BSD licence.
* Author: Tomasz Sowa <t.sowa@ttmath.org>
*/
/*
* Copyright (c) 2012-2022, Tomasz Sowa
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "job.h"
#include "plugin.h"
#include "log.h"
#include "functions/functions.h"
#include "lock.h"
namespace Winix
{
Job::Job()
{
jobs_queue_tab.resize(PRIORITY_HIGHEST + 1);
cur = nullptr;
functions = nullptr;
mounts = nullptr;
load_avg = nullptr;
req_tab = nullptr;
}
void Job::SetCur(Cur * cur)
{
this->cur = cur;
}
void Job::SetFunctions(Functions * functions)
{
this->functions = functions;
}
void Job::SetLoadAvg(LoadAvg * load_avg)
{
this->load_avg = load_avg;
}
void Job::SetMounts(Mounts * mounts)
{
this->mounts = mounts;
}
void Job::SetReqTab(std::list<Request> * req_tab)
{
this->req_tab = req_tab;
}
void Job::CheckPriority(size_t & priority) const
{
if( priority > PRIORITY_HIGHEST )
priority = PRIORITY_HIGHEST;
}
// first thread (objects locked)
void Job::Add(pt::Space & job, size_t priority)
{
CheckPriority(priority);
JobTask task;
task.job_type = JobTask::JOB_TYPE_DEFAULT;
task.job_id = JobTask::JOB_ID_DEFAULT;
task.job = job;
jobs_queue_tab[priority].push(task);
WakeUpThread();
}
// first thread (objects locked)
void Job::Add(Request * request, pt::Space & job, size_t priority)
{
CheckPriority(priority);
JobTask task;
task.job_type = JobTask::JOB_TYPE_REQUEST_CONTINUATION;
task.job_id = JobTask::JOB_ID_DEFAULT;
task.request = request;
task.job = job;
jobs_queue_tab[priority].push(task);
WakeUpThread();
}
// first thread (objects locked)
void Job::Add(long job_id, pt::Space & job, size_t priority)
{
CheckPriority(priority);
JobTask task;
task.job_type = JobTask::JOB_TYPE_DEFAULT;
task.job_id = job_id;
task.job = job;
jobs_queue_tab[priority].push(task);
WakeUpThread();
}
// first thread (objects locked)
void Job::Add(long job_id, Request * request, pt::Space & job, size_t priority)
{
CheckPriority(priority);
JobTask task;
task.job_type = JobTask::JOB_TYPE_REQUEST_CONTINUATION;
task.job_id = job_id;
task.request = request;
task.job = job;
jobs_queue_tab[priority].push(task);
WakeUpThread();
}
size_t Job::Size(size_t priority) const
{
CheckPriority(priority);
return jobs_queue_tab[priority].size();
}
size_t Job::Size() const
{
size_t sum = 0;
for(size_t i=0 ; i <= PRIORITY_HIGHEST ; ++i)
sum += Size(i);
return sum;
}
bool Job::Empty(size_t priority) const
{
CheckPriority(priority);
return jobs_queue_tab[priority].empty();
}
bool Job::Empty() const
{
for(size_t i=0 ; i <= PRIORITY_HIGHEST ; ++i)
if( !Empty(i) )
return false;
return true;
}
/*
second thread
*/
// second thread (objects locked)
bool Job::SignalReceived()
{
return !Empty();
}
// second thread (objects not locked)
void Job::Do()
{
size_t i = PRIORITY_HIGHEST + 1;
bool is_empty;
while( i-- > 0 && !IsExitSignal() )
{
do
{
{
Winix::Lock lock(synchro);
is_empty = Empty(i);
}
if( !is_empty )
DoQueue(jobs_queue_tab[i], i);
}
while( !is_empty && !IsExitSignal() );
}
}
// second thread (objects not locked, jobs_queue is not empty)
void Job::DoQueue(JobsQueue & jobs_queue, size_t priority)
{
bool is_empty;
do
{
// references will not be invalidated after insertion to jobs_queue
// (jobs_queue is std::queue and it uses std::deque by default)
JobTask * job_task = nullptr;
{
Winix::Lock lock(synchro);
if( !jobs_queue.empty() )
{
job_task = &jobs_queue.front();
}
}
if( job_task )
{
DoJob(*job_task, priority);
{
Winix::Lock lock(synchro);
jobs_queue.pop();
is_empty = jobs_queue.empty();
}
}
}
while( !is_empty && !IsExitSignal() );
}
// second thread (objects not locked)
void Job::DoJob(JobTask & task, size_t priority)
{
PluginRes res;
try
{
main_log << logsave;
if( task.job_type == JobTask::JOB_TYPE_REQUEST_CONTINUATION )
{
if( task.request )
{
Cur local_cur;
// is this correct? we can read the task.request->session ... from the second thread?
// chyba tak bo ta struktura zostaje na stosie specjalnie dla joba do uzytku
local_cur.request = task.request;
local_cur.session = task.request->session;
local_cur.mount = task.request->mount;
cur->request->run_state = Request::RunState::job_run;
res = plugin->Call(model_connector, &log, &local_cur, WINIX_JOB, &task.job, nullptr, task.job_type, task.job_id);
log << logsave;
{
Winix::Lock lock(synchro);
cur->request = local_cur.request;
cur->session = local_cur.session;
cur->mount = local_cur.mount;
// winix templates functions uses its own cur (global pointer in TemplatesFunctions namespace)
// so we have to set it correctly
DoRequestContinuationJob(task, priority);
}
}
else
{
log << log2 << "Job: request continuation task doesn't have a request set, skipping the job and request continuation" << logend;
log << log2 << "Job: this is an internal error, the request if exists in the queue will never be removed" << logend;
}
}
else
{
res = plugin->Call(model_connector, &log, nullptr, WINIX_JOB, &task.job, nullptr, task.job_type, task.job_id);
}
log << logsave;
if( res.res_true == 0 )
{
DoWinixJob(task.job); // probably this will be removed
}
}
catch(...)
{
log << log2 << "Job: an exception was catched when doing a job" << logend;
}
}
// second thread (objects locked)
// use main_log (after locking) for the logs to be in the correct order
void Job::DoRequestContinuationJob(JobTask & job_task, size_t priority)
{
if( cur->request->function )
{
main_log << config->log_delimiter << logend;
main_log << log3 << "Job: making a continuation for request " << cur->request << logend;
main_log << log4 << "Job: doing directory analysis again" << logend;
if( functions->ParseOnlyDirs() )
{
cur->mount = mounts->CalcCurMount(cur->request);
cur->request->mount = cur->mount;
cur->request->run_state = Request::RunState::job_continuation_run;
functions->ContinueMakeFunction();
if( cur->request->run_state == Request::RunState::assigned_to_job )
{
log << log3 << "Job: this request (" << cur->request << ") has been moved to the job queue again" << logend;
Add(cur->request->job_id, cur->request, cur->request->job, priority);
}
}
else
{
main_log << log4 << "Job: directories have not been correctly prepared, finishing the request" << logend;
cur->request->http_status = Header::status_500_internal_server_error;
cur->request->run_state = Request::RunState::prepare_to_finish;
}
}
else
{
main_log << log2 << "Job: request continuation task doesn't have a funtion set, return 500 internal error" << logend;
cur->request->http_status = Header::status_500_internal_server_error;
cur->request->run_state = Request::RunState::prepare_to_finish;
}
if( cur->request->run_state != Request::RunState::assigned_to_job )
{
cur->request->FinishRequest(); // if cur->request->function were null then templates functions would not work
load_avg->StopRequest(cur->request);
cur->request->Clear();
cur->request->run_state = Request::RunState::finished;
RemoveOldRequest(cur->request);
}
main_log << logendrequest;
}
// second thread (objects not locked)
void Job::DoWinixJob(pt::Space & job)
{
//log << log1 << "standard winix job: " << job.Text(L"type") << logend;
}
// second thread (objects locked)
// use main_log for the logs to be in the correct order
void Job::RemoveOldRequest(Request * request)
{
std::list<Request>::iterator i = req_tab->begin();
while( i != req_tab->end() )
{
if( &(*i) == request )
{
main_log << log3 << "Job: removing request " << request << logend;
req_tab->erase(i);
break;
}
else
{
++i;
}
}
}
} // namespace Winix