/* * This file is a part of Winix * and is distributed under the 2-Clause BSD licence. * Author: Tomasz Sowa */ /* * Copyright (c) 2012-2022, Tomasz Sowa * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * */ #include "job.h" #include "plugin.h" #include "log.h" #include "functions/functions.h" #include "lock.h" namespace Winix { Job::Job() { jobs_queue_tab.resize(PRIORITY_HIGHEST + 1); cur = nullptr; functions = nullptr; mounts = nullptr; load_avg = nullptr; req_tab = nullptr; } void Job::SetCur(Cur * cur) { this->cur = cur; } void Job::SetFunctions(Functions * functions) { this->functions = functions; } void Job::SetLoadAvg(LoadAvg * load_avg) { this->load_avg = load_avg; } void Job::SetMounts(Mounts * mounts) { this->mounts = mounts; } void Job::SetReqTab(std::list * req_tab) { this->req_tab = req_tab; } void Job::CheckPriority(size_t & priority) const { if( priority > PRIORITY_HIGHEST ) priority = PRIORITY_HIGHEST; } // first thread (objects locked) void Job::Add(pt::Space & job, size_t priority) { CheckPriority(priority); JobTask task; task.job_type = JobTask::JOB_TYPE_DEFAULT; task.job_id = JobTask::JOB_ID_DEFAULT; task.job = job; jobs_queue_tab[priority].push(task); WakeUpThread(); } // first thread (objects locked) void Job::Add(Request * request, pt::Space & job, size_t priority) { CheckPriority(priority); JobTask task; task.job_type = JobTask::JOB_TYPE_REQUEST_CONTINUATION; task.job_id = JobTask::JOB_ID_DEFAULT; task.request = request; task.job = job; jobs_queue_tab[priority].push(task); WakeUpThread(); } // first thread (objects locked) void Job::Add(long job_id, pt::Space & job, size_t priority) { CheckPriority(priority); JobTask task; task.job_type = JobTask::JOB_TYPE_DEFAULT; task.job_id = job_id; task.job = job; jobs_queue_tab[priority].push(task); WakeUpThread(); } // first thread (objects locked) void Job::Add(long job_id, Request * request, pt::Space & job, size_t priority) { CheckPriority(priority); JobTask task; task.job_type = JobTask::JOB_TYPE_REQUEST_CONTINUATION; task.job_id = job_id; task.request = request; task.job = job; jobs_queue_tab[priority].push(task); WakeUpThread(); } size_t Job::Size(size_t priority) const { CheckPriority(priority); return jobs_queue_tab[priority].size(); } size_t Job::Size() const { size_t sum = 0; for(size_t i=0 ; i <= PRIORITY_HIGHEST ; ++i) sum += Size(i); return sum; } bool Job::Empty(size_t priority) const { CheckPriority(priority); return jobs_queue_tab[priority].empty(); } bool Job::Empty() const { for(size_t i=0 ; i <= PRIORITY_HIGHEST ; ++i) if( !Empty(i) ) return false; return true; } /* second thread */ // second thread (objects locked) bool Job::SignalReceived() { return !Empty(); } // second thread (objects not locked) void Job::Do() { size_t i = PRIORITY_HIGHEST + 1; bool is_empty; while( i-- > 0 && !IsExitSignal() ) { do { { Winix::Lock lock(synchro); is_empty = Empty(i); } if( !is_empty ) DoQueue(jobs_queue_tab[i], i); } while( !is_empty && !IsExitSignal() ); } } // second thread (objects not locked, jobs_queue is not empty) void Job::DoQueue(JobsQueue & jobs_queue, size_t priority) { bool is_empty; do { // references will not be invalidated after insertion to jobs_queue // (jobs_queue is std::queue and it uses std::deque by default) JobTask * job_task = nullptr; { Winix::Lock lock(synchro); if( !jobs_queue.empty() ) { job_task = &jobs_queue.front(); } } if( job_task ) { DoJob(*job_task, priority); { Winix::Lock lock(synchro); jobs_queue.pop(); is_empty = jobs_queue.empty(); } } } while( !is_empty && !IsExitSignal() ); } // second thread (objects not locked) void Job::DoJob(JobTask & task, size_t priority) { PluginRes res; try { main_log << logsave; if( task.job_type == JobTask::JOB_TYPE_REQUEST_CONTINUATION ) { if( task.request ) { Cur local_cur; // is this correct? we can read the task.request->session ... from the second thread? // chyba tak bo ta struktura zostaje na stosie specjalnie dla joba do uzytku local_cur.request = task.request; local_cur.session = task.request->session; local_cur.mount = task.request->mount; cur->request->run_state = Request::RunState::job_run; res = plugin->Call(model_connector, &log, &local_cur, WINIX_JOB, &task.job, nullptr, task.job_type, task.job_id); log << logsave; { Winix::Lock lock(synchro); cur->request = local_cur.request; cur->session = local_cur.session; cur->mount = local_cur.mount; // winix templates functions uses its own cur (global pointer in TemplatesFunctions namespace) // so we have to set it correctly DoRequestContinuationJob(task, priority); } } else { log << log2 << "Job: request continuation task doesn't have a request set, skipping the job and request continuation" << logend; log << log2 << "Job: this is an internal error, the request if exists in the queue will never be removed" << logend; } } else { res = plugin->Call(model_connector, &log, nullptr, WINIX_JOB, &task.job, nullptr, task.job_type, task.job_id); } log << logsave; if( res.res_true == 0 ) { DoWinixJob(task.job); // probably this will be removed } } catch(...) { log << log2 << "Job: an exception was catched when doing a job" << logend; } } // second thread (objects locked) // use main_log (after locking) for the logs to be in the correct order void Job::DoRequestContinuationJob(JobTask & job_task, size_t priority) { if( cur->request->function ) { main_log << config->log_delimiter << logend; main_log << log3 << "Job: making a continuation for request " << cur->request << logend; main_log << log4 << "Job: doing directory analysis again" << logend; if( functions->ParseOnlyDirs() ) { cur->mount = mounts->CalcCurMount(cur->request); cur->request->mount = cur->mount; cur->request->run_state = Request::RunState::job_continuation_run; functions->ContinueMakeFunction(); if( cur->request->run_state == Request::RunState::assigned_to_job ) { log << log3 << "Job: this request (" << cur->request << ") has been moved to the job queue again" << logend; Add(cur->request->job_id, cur->request, cur->request->job, priority); } } else { main_log << log4 << "Job: directories have not been correctly prepared, finishing the request" << logend; cur->request->http_status = Header::status_500_internal_server_error; cur->request->run_state = Request::RunState::prepare_to_finish; } } else { main_log << log2 << "Job: request continuation task doesn't have a funtion set, return 500 internal error" << logend; cur->request->http_status = Header::status_500_internal_server_error; cur->request->run_state = Request::RunState::prepare_to_finish; } if( cur->request->run_state != Request::RunState::assigned_to_job ) { cur->request->FinishRequest(); // jak cur->request->function bedzie null to nie zadzialaja funkcje templajtowe load_avg->StopRequest(cur->request); cur->request->Clear(); cur->request->run_state = Request::RunState::finished; RemoveOldRequest(cur->request); } main_log << logendrequest; } // second thread (objects not locked) void Job::DoWinixJob(pt::Space & job) { //log << log1 << "standard winix job: " << job.Text(L"type") << logend; } // second thread (objects locked) // use main_log for the logs to be in the correct order void Job::RemoveOldRequest(Request * request) { std::list::iterator i = req_tab->begin(); while( i != req_tab->end() ) { if( &(*i) == request ) { main_log << log3 << "Job: removing request " << request << logend; req_tab->erase(i); break; } else { ++i; } } } } // namespace Winix