allow a request to be processed in a job

Now we allow a request to be passed into a job queue,
and after the job finishes the request is passed into
a controller again. In order to achieve this we have
a requests queue in System, when we put a request
to the job this Request structure is preserved in the
queue and for a new request a new Request is added to
the queue.

while here:
- remove App::Lock()/Unlock(), use scoped locking
- fix: Plugin now has a Call method which takes ModelConnector
  and a logger (used in multithreaded environment)
- BaseThread has a main_model_connector pointer
  to the main (from the main thread) model connector
- the FastCGI structure fcgi_request moved from App to Request
- some methods for handling requests moved from App to Request
- small refactoring in main.cpp
- add Http class (a http client)
This commit is contained in:
2022-07-25 14:21:21 +02:00
parent b2d92b85a0
commit 979ef907fe
65 changed files with 7018 additions and 4437 deletions

View File

@@ -5,7 +5,7 @@
*/
/*
* Copyright (c) 2012-2019, Tomasz Sowa
* Copyright (c) 2012-2022, Tomasz Sowa
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,6 +35,9 @@
#include "job.h"
#include "plugin.h"
#include "log.h"
#include "functions/functions.h"
#include "lock.h"
namespace Winix
@@ -45,31 +48,110 @@ namespace Winix
Job::Job()
{
jobs_queue_tab.resize(WINIX_JOBS_HOW_MANY_PRIORITIES);
jobs_queue_tab.resize(PRIORITY_HIGHEST + 1);
cur = nullptr;
functions = nullptr;
mounts = nullptr;
load_avg = nullptr;
req_tab = nullptr;
}
void Job::CheckPriority(int & priority) const
void Job::SetCur(Cur * cur)
{
if( priority < 0 )
priority = 0;
this->cur = cur;
}
if( priority >= WINIX_JOBS_HOW_MANY_PRIORITIES )
priority = WINIX_JOBS_HOW_MANY_PRIORITIES - 1;
void Job::SetFunctions(Functions * functions)
{
this->functions = functions;
}
void Job::SetLoadAvg(LoadAvg * load_avg)
{
this->load_avg = load_avg;
}
void Job::SetMounts(Mounts * mounts)
{
this->mounts = mounts;
}
void Job::SetReqTab(std::list<Request> * req_tab)
{
this->req_tab = req_tab;
}
void Job::CheckPriority(size_t & priority) const
{
if( priority > PRIORITY_HIGHEST )
priority = PRIORITY_HIGHEST;
}
// first thread (objects locked)
void Job::Add(pt::Space & job, int priority)
void Job::Add(pt::Space & job, size_t priority)
{
CheckPriority(priority);
jobs_queue_tab[priority].push(job);
JobTask task;
task.job_type = JobTask::JOB_TYPE_DEFAULT;
task.job_id = JobTask::JOB_ID_DEFAULT;
task.job = job;
jobs_queue_tab[priority].push(task);
WakeUpThread();
}
// first thread (objects locked)
void Job::Add(Request * request, pt::Space & job, size_t priority)
{
CheckPriority(priority);
JobTask task;
task.job_type = JobTask::JOB_TYPE_REQUEST_CONTINUATION;
task.job_id = JobTask::JOB_ID_DEFAULT;
task.request = request;
task.job = job;
jobs_queue_tab[priority].push(task);
WakeUpThread();
}
size_t Job::Size(int priority) const
// first thread (objects locked)
void Job::Add(long job_id, pt::Space & job, size_t priority)
{
CheckPriority(priority);
JobTask task;
task.job_type = JobTask::JOB_TYPE_DEFAULT;
task.job_id = job_id;
task.job = job;
jobs_queue_tab[priority].push(task);
WakeUpThread();
}
// first thread (objects locked)
void Job::Add(long job_id, Request * request, pt::Space & job, size_t priority)
{
CheckPriority(priority);
JobTask task;
task.job_type = JobTask::JOB_TYPE_REQUEST_CONTINUATION;
task.job_id = job_id;
task.request = request;
task.job = job;
jobs_queue_tab[priority].push(task);
WakeUpThread();
}
size_t Job::Size(size_t priority) const
{
CheckPriority(priority);
return jobs_queue_tab[priority].size();
@@ -81,14 +163,14 @@ size_t Job::Size() const
{
size_t sum = 0;
for(size_t i=0 ; i<WINIX_JOBS_HOW_MANY_PRIORITIES ; ++i)
for(size_t i=0 ; i <= PRIORITY_HIGHEST ; ++i)
sum += Size(i);
return sum;
}
bool Job::Empty(int priority) const
bool Job::Empty(size_t priority) const
{
CheckPriority(priority);
return jobs_queue_tab[priority].empty();
@@ -97,7 +179,7 @@ bool Job::Empty(int priority) const
bool Job::Empty() const
{
for(size_t i=0 ; i<WINIX_JOBS_HOW_MANY_PRIORITIES ; ++i)
for(size_t i=0 ; i <= PRIORITY_HIGHEST ; ++i)
if( !Empty(i) )
return false;
@@ -121,19 +203,20 @@ bool Job::SignalReceived()
// second thread (objects not locked)
void Job::Do()
{
size_t i = WINIX_JOBS_HOW_MANY_PRIORITIES;
size_t i = PRIORITY_HIGHEST + 1;
bool is_empty;
while( i-- > 0 && !IsExitSignal() )
{
do
{
Lock();
is_empty = Empty(i);
Unlock();
{
Winix::Lock lock(synchro);
is_empty = Empty(i);
}
if( !is_empty )
DoQueue(jobs_queue_tab[i]);
DoQueue(jobs_queue_tab[i], i);
}
while( !is_empty && !IsExitSignal() );
}
@@ -141,45 +224,144 @@ bool is_empty;
// second thread (objects not locked, jobs_queue is not empty)
void Job::DoQueue(JobsQueue & jobs_queue)
void Job::DoQueue(JobsQueue & jobs_queue, size_t priority)
{
bool is_empty;
do
{
Lock();
// references will not be invalidated after insertion to jobs_queue
// (jobs_queue is std::queue and it uses std::deque by default)
pt::Space & job = jobs_queue.front();
Unlock();
JobTask * job_task = nullptr;
DoJob(job);
{
Winix::Lock lock(synchro);
Lock();
jobs_queue.pop();
is_empty = jobs_queue.empty();
Unlock();
if( !jobs_queue.empty() )
{
job_task = &jobs_queue.front();
}
}
if( job_task )
{
DoJob(*job_task, priority);
{
Winix::Lock lock(synchro);
jobs_queue.pop();
is_empty = jobs_queue.empty();
}
}
}
while( !is_empty && !IsExitSignal() );
}
// second thread (objects not locked)
void Job::DoJob(pt::Space & job)
void Job::DoJob(JobTask & task, size_t priority)
{
PluginRes res;
try
{
PluginRes res = plugin->Call((Session*)0, WINIX_JOB, &job);
main_log << logsave;
if( task.job_type == JobTask::JOB_TYPE_REQUEST_CONTINUATION )
{
if( task.request )
{
Cur local_cur;
// is this correct? we can read the task.request->session ... from the second thread?
// chyba tak bo ta struktura zostaje na stosie specjalnie dla joba do uzytku
local_cur.request = task.request;
local_cur.session = task.request->session;
local_cur.mount = task.request->mount;
cur->request->run_state = Request::RunState::job_run;
res = plugin->Call(model_connector, &log, &local_cur, WINIX_JOB, &task.job, nullptr, task.job_type, task.job_id);
{
Winix::Lock lock(synchro);
cur->request = local_cur.request;
cur->session = local_cur.session;
cur->mount = local_cur.mount;
// winix templates functions uses its own cur (global pointer in TemplatesFunctions namespace)
// so we have to set it correctly
DoRequestContinuationJob(task, priority);
}
}
else
{
log << log2 << "Job: request continuation task doesn't have a request set, skipping the job and request continuation" << logend;
log << log2 << "Job: this is an internal error, the request if exists in the queue will never be removed" << logend;
}
}
else
{
res = plugin->Call(model_connector, &log, nullptr, WINIX_JOB, &task.job, nullptr, task.job_type, task.job_id);
}
log << logsave;
if( res.res_true == 0 )
DoWinixJob(job);
{
DoWinixJob(task.job); // probably this will be removed
}
}
catch(...)
{
log << log2 << "Job: an exception was catched when doing a job" << logend;
}
}
// second thread (objects locked)
// use main_log (after locking) for the logs to be in the correct order
void Job::DoRequestContinuationJob(JobTask & job_task, size_t priority)
{
if( cur->request->function )
{
main_log << config->log_delimiter << logend;
main_log << log3 << "Job: making a continuation for request " << cur->request << logend;
main_log << log4 << "Job: doing directory analysis again" << logend;
if( functions->ParseOnlyDirs() )
{
cur->mount = mounts->CalcCurMount(cur->request);
cur->request->mount = cur->mount;
cur->request->run_state = Request::RunState::job_continuation_run;
functions->ContinueMakeFunction();
}
}
else
{
main_log << log2 << "Job: request continuation task doesn't have a funtion set, return 500 internal error" << logend;
cur->request->http_status = Header::status_500_internal_server_error;
}
if( cur->request->run_state == Request::RunState::assigned_to_job )
{
log << log3 << "Job: this request (" << cur->request << ") has been moved to the job queue again" << logend;
Add(cur->request->job_id, cur->request, cur->request->job, priority);
}
else
{
cur->request->FinishRequest(); // jak cur->request->function bedzie null to nie zadzialaja funkcje templajtowe
load_avg->StopRequest(cur->request);
cur->request->Clear();
cur->request->run_state = Request::RunState::finished;
RemoveOldRequest(cur->request);
}
main_log << logendrequest;
}
// second thread (objects not locked)
void Job::DoWinixJob(pt::Space & job)
{
@@ -188,6 +370,28 @@ void Job::DoWinixJob(pt::Space & job)
// second thread (objects locked)
// use main_log for the logs to be in the correct order
void Job::RemoveOldRequest(Request * request)
{
std::list<Request>::iterator i = req_tab->begin();
while( i != req_tab->end() )
{
if( &(*i) == request )
{
main_log << log3 << "Job: removing request " << request << logend;
req_tab->erase(i);
break;
}
else
{
++i;
}
}
}
} // namespace Winix