add support for nested transactions

while here:
- don't allow to copy/move DbConnector
This commit is contained in:
Tomasz Sowa 2022-05-26 20:14:14 +02:00
parent 907f10671d
commit 1d4de8abe2
5 changed files with 275 additions and 49 deletions

View File

@ -51,16 +51,9 @@ DbConnector::DbConnector()
expression_allocated = false;
log = nullptr;
log_queries = false;
transaction_index = 0;
}
DbConnector::DbConnector(const DbConnector &)
{
db_expression = nullptr;
expression_allocated = false;
log = nullptr;
}
DbConnector::~DbConnector()
{
deallocate_expression();
@ -173,6 +166,206 @@ bool DbConnector::query_remove(const pt::TextStream & stream, QueryResult & quer
}
bool DbConnector::begin()
{
bool status = false;
if( transaction_index > 0 )
{
pt::TextStream str;
str << "SAVEPOINT savepoint_" << transaction_index;
status = DbConnector::query(str);
}
else
{
status = DbConnector::query("BEGIN");
}
if( status )
{
transaction_index += 1;
}
else
{
if( log )
{
(*log) << pt::Log::log1 << "Morm: I cannot start a transaction" << pt::Log::logend;
}
}
return status;
}
bool DbConnector::begin_if_needed()
{
bool status = true;
if( transaction_index == 0 )
{
status = begin();
}
return status;
}
bool DbConnector::rollback()
{
return rollback(transaction_index);
}
bool DbConnector::commit()
{
return commit(transaction_index);
}
bool DbConnector::rollback_one_transaction(size_t index)
{
bool status = false;
if( index > 1 )
{
pt::TextStream str;
str << "ROLLBACK TO SAVEPOINT savepoint_" << (index - 1);
status = DbConnector::query(str);
transaction_index = index - 1; // decrement it even if rollback failed
}
else
if( index == 1 )
{
status = DbConnector::query("ROLLBACK");
transaction_index = 0;
}
return status;
}
bool DbConnector::rollback(size_t index)
{
bool status = false;
if( index == 0 )
{
if( log )
{
(*log) << pt::Log::log1 << "Morm: there is no a transaction with zero index - skipping rollback";
}
}
else
if( index > transaction_index )
{
if( log )
{
(*log) << pt::Log::log1 << "Morm: transaction";
if( index > 1 )
(*log) << " for savepoint_" << (index-1);
(*log) << " does not exist - skipping rollback" << pt::Log::logend;
}
}
else
{
status = true;
for(size_t i = transaction_index ; i >= index ; --i)
{
if( !rollback_one_transaction(i) )
{
/*
* return false if at least one rollback failed
*/
status = false;
}
}
}
return status;
}
bool DbConnector::commit_one_transaction(size_t index)
{
bool status = false;
if( index > 1 )
{
pt::TextStream str;
str << "RELEASE SAVEPOINT savepoint_" << (index - 1);
status = DbConnector::query(str);
transaction_index = index - 1;
}
else
if( index == 1 )
{
status = DbConnector::query("COMMIT");
transaction_index = 0;
}
return status;
}
bool DbConnector::commit(size_t index)
{
bool status = false;
if( index == 0 )
{
if( log )
{
(*log) << pt::Log::log1 << "Morm: there is no a transaction with zero index - skipping commit";
}
}
else
if( index > transaction_index )
{
if( log )
{
(*log) << pt::Log::log1 << "Morm: transaction";
if( index > 1 )
(*log) << " for savepoint_" << (index-1);
(*log) << " does not exist - skipping commit" << pt::Log::logend;
}
}
else
{
status = true;
for(size_t i = transaction_index ; i >= index ; --i)
{
if( !commit_one_transaction(i) )
{
/*
* return false if at least one commit failed
*/
status = false;
}
}
}
return status;
}
size_t DbConnector::get_transaction_index()
{
return transaction_index;
}
DbExpression * DbConnector::get_expression()
{

View File

@ -52,7 +52,8 @@ class DbConnector
public:
DbConnector();
DbConnector(const DbConnector &);
DbConnector(const DbConnector &) = delete;
DbConnector(DbConnector &&) = delete;
virtual ~DbConnector();
virtual void set_logger(pt::Log * log);
@ -91,6 +92,39 @@ public:
virtual bool query_insert(const pt::TextStream & stream, QueryResult & query_result);
virtual bool query_remove(const pt::TextStream & stream, QueryResult & query_result);
/*
* create a new transaction
* first transaction has index equal to one
*/
virtual bool begin();
/*
* create a new transaction if there is no a transaction started yet
*/
virtual bool begin_if_needed();
/*
* rollback or commit the last transaction
*/
virtual bool rollback();
virtual bool commit();
/*
* rollback or commit all transactions from the last one to the index
* (first transaction has index one, there is no a transaction with index zero)
*
*/
virtual bool rollback(size_t index);
virtual bool commit(size_t index);
/*
* return current transaction index
* first transaction has index equal to one
* returns zero if there are no any transactions
*/
virtual size_t get_transaction_index();
virtual void get_value(const char * value_str, char & field_value, const FT & field_type = FT::default_type);
virtual void get_value(const char * value_str, unsigned char & field_value, const FT & field_type = FT::default_type);
virtual void get_value(const char * value_str, wchar_t & field_value, const FT & field_type = FT::default_type);
@ -141,6 +175,7 @@ protected:
bool expression_allocated;
pt::Log * log;
bool log_queries;
size_t transaction_index;
virtual void allocate_default_expression() = 0;
@ -161,6 +196,9 @@ protected:
virtual void unescape_bin_string(const char * str, std::string & out);
virtual void unescape_bin_string(const char * str, std::wstring & out);
virtual bool rollback_one_transaction(size_t index);
virtual bool commit_one_transaction(size_t index);
private:
@ -169,8 +207,6 @@ private:
};

View File

@ -302,7 +302,6 @@ bool PostgreSQLConnector::query_remove(const pt::TextStream & stream, QueryResul
//void PostgreSQLConnector::CreateIdList(const std::vector<long> & id_tab, std::wstring & list, bool add_parentheses)
//{
//wchar_t buffer[50];

View File

@ -44,6 +44,7 @@ Transaction::Transaction(ModelConnector * model_connector)
this->model_connector = model_connector;
is_transaction_started = false;
is_transaction_successful = true;
transaction_index = 0;
begin();
}
@ -53,6 +54,7 @@ Transaction::Transaction(ModelConnector * model_connector, bool auto_begin_trans
this->model_connector = model_connector;
is_transaction_started = false;
is_transaction_successful = true;
transaction_index = 0;
if( auto_begin_transaction )
{
@ -106,6 +108,7 @@ void Transaction::set_successful(bool is_successful)
bool Transaction::begin()
{
bool status = false;
pt::Log * log = get_logger();
if( is_transaction_started )
@ -118,7 +121,16 @@ bool Transaction::begin()
rollback(); // what if there is an error here? skip it at the moment
}
bool status = do_query("BEGIN");
if( model_connector )
{
DbConnector * db_connector = model_connector->get_db_connector();
if( db_connector )
{
status = db_connector->begin();
transaction_index = db_connector->get_transaction_index();
}
}
if( status )
{
@ -129,11 +141,6 @@ bool Transaction::begin()
{
is_transaction_started = false;
is_transaction_successful = false;
if( log )
{
(*log) << pt::Log::log1 << "Morm: I cannot start a transaction" << pt::Log::logend;
}
}
return status;
@ -159,12 +166,18 @@ bool Transaction::rollback()
if( is_transaction_started )
{
status = do_query("ROLLBACK");
if( status )
if( model_connector )
{
is_transaction_started = false;
DbConnector * db_connector = model_connector->get_db_connector();
if( db_connector )
{
status = db_connector->rollback(transaction_index);
}
}
// set it to false even if rollack failed
is_transaction_started = false;
}
else
{
@ -186,12 +199,18 @@ bool Transaction::commit()
if( is_transaction_started )
{
status = do_query("COMMIT");
if( status )
if( model_connector )
{
is_transaction_started = false;
DbConnector * db_connector = model_connector->get_db_connector();
if( db_connector )
{
status = db_connector->commit(transaction_index);
}
}
// set it to false even if commit failed
is_transaction_started = false;
}
else
{
@ -220,27 +239,6 @@ bool Transaction::finish()
}
bool Transaction::do_query(const char * query)
{
bool status = false;
if( model_connector )
{
DbConnector * db_connector = model_connector->get_db_connector();
if( db_connector )
{
status = db_connector->query(query);
}
}
return status;
}
pt::Log * Transaction::get_logger()
{
pt::Log * logger = nullptr;

View File

@ -68,9 +68,9 @@ protected:
ModelConnector * model_connector;
bool is_transaction_started;
bool is_transaction_successful;
size_t transaction_index;
bool do_query(const char * query);
pt::Log * get_logger();
virtual pt::Log * get_logger();
private: