Add multithreading support

Make QPromise thread safe but also ensure that continuation lambdas (then/fail/finally) are called in the thread of the promise instance they are attached to.
This commit is contained in:
Simon Brunel 2017-06-04 18:22:24 +02:00
parent 2d9961de82
commit 81625e1964
7 changed files with 281 additions and 45 deletions

View File

@ -103,13 +103,13 @@ QFuture<int> future = QtConcurrent::run([]() {
return 42; return 42;
}); });
QPromise<int> promise = qtPromise(future); QPromise<int> promise = qPromise(future);
``` ```
or simply: or simply:
```cpp ```cpp
auto promise = qtPromise(QtConcurrent::run([]() { auto promise = qPromise(QtConcurrent::run([]() {
// {...} // {...}
})); }));
``` ```
@ -154,6 +154,11 @@ promise.then([](int res) {
}); });
``` ```
## Thread-Safety
QPromise is thread-safe and can be copied and accessed across different threads. QPromise relies on [explicitly data sharing](http://doc.qt.io/qt-5/qexplicitlyshareddatapointer.html#details) and thus `auto p2 = p1` represents the same promise: when `p1` resolves, handlers registered on `p1` and `p2` are called, the fulfilled value being shared between both instances.
> **Note:** while it's safe to access the resolved value from different threads using [`then`](#qpromise-then), QPromise provides no guarantee about the object being pointed to. Thread-safety and reentrancy rules for that object still apply.
## QPromise ## QPromise
### <a name="qpromise-qpromise"></a> `QPromise<T>::QPromise(resolver)` ### <a name="qpromise-qpromise"></a> `QPromise<T>::QPromise(resolver)`
Creates a new promise that will be fulfilled or rejected by the given `resolver` lambda: Creates a new promise that will be fulfilled or rejected by the given `resolver` lambda:

View File

@ -6,10 +6,14 @@
#include "qpromiseglobal.h" #include "qpromiseglobal.h"
// Qt // Qt
#include <QTimer> #include <QCoreApplication>
#include <QAbstractEventDispatcher>
#include <QThread>
#include <QVector>
#include <QReadWriteLock>
#include <QSharedPointer> #include <QSharedPointer>
#include <QSharedData> #include <QSharedData>
#include <QVector> #include <QPointer>
namespace QtPromise { namespace QtPromise {
@ -26,10 +30,22 @@ class QPromiseReject;
namespace QtPromisePrivate { namespace QtPromisePrivate {
// https://stackoverflow.com/a/21653558
template <typename F> template <typename F>
inline void qtpromise_defer(F&& f) static void qtpromise_defer(F&& f, QThread* thread = nullptr)
{ {
QTimer::singleShot(0, std::forward<F>(f)); struct Event : public QEvent
{
using FType = typename std::decay<F>::type;
Event(FType&& f) : QEvent(QEvent::None), m_f(std::move(f)) { }
Event(const FType& f) : QEvent(QEvent::None), m_f(f) { }
~Event() { m_f(); }
FType m_f;
};
QObject* target = QAbstractEventDispatcher::instance(thread);
Q_ASSERT_X(target, "postMetaCall", "Target thread must have an event loop");
QCoreApplication::postEvent(target, new Event(std::forward<F>(f)));
} }
template <typename T> template <typename T>
@ -320,21 +336,35 @@ class PromiseDataBase: public QSharedData
{ {
public: public:
using Error = QtPromise::QPromiseError; using Error = QtPromise::QPromiseError;
using Catcher = std::function<void(const Error&)>; using Catcher = std::pair<QPointer<QThread>, std::function<void(const Error&)> >;
virtual ~PromiseDataBase() {} virtual ~PromiseDataBase() {}
bool isFulfilled() const { return m_settled && m_error.isNull(); } bool isFulfilled() const
bool isRejected() const { return m_settled && !m_error.isNull(); }
bool isPending() const { return !m_settled; }
void addCatcher(Catcher catcher)
{ {
m_catchers.append(std::move(catcher)); return !isPending() && m_error.isNull();
}
bool isRejected() const
{
return !isPending() && !m_error.isNull();
}
bool isPending() const
{
QReadLocker lock(&m_lock);
return !m_settled;
}
void addCatcher(std::function<void(const Error&)> catcher)
{
QWriteLocker lock(&m_lock);
m_catchers.append({QThread::currentThread(), std::move(catcher)});
} }
void reject(Error error) void reject(Error error)
{ {
Q_ASSERT(isPending());
Q_ASSERT(m_error.isNull()); Q_ASSERT(m_error.isNull());
m_error.reset(new Error(std::move(error))); m_error.reset(new Error(std::move(error)));
setSettled(); setSettled();
@ -342,26 +372,36 @@ public:
void dispatch() void dispatch()
{ {
Q_ASSERT(!isPending()); if (isPending()) {
return;
}
if (isFulfilled()) { if (m_error.isNull()) {
notify(); notify();
return; return;
} }
Q_ASSERT(isRejected()); m_lock.lockForWrite();
QSharedPointer<Error> error = m_error;
QVector<Catcher> catchers(std::move(m_catchers)); QVector<Catcher> catchers(std::move(m_catchers));
m_lock.unlock();
QSharedPointer<Error> error = m_error;
Q_ASSERT(!error.isNull());
for (const auto& catcher: catchers) { for (const auto& catcher: catchers) {
const auto& fn = catcher.second;
qtpromise_defer([=]() { qtpromise_defer([=]() {
catcher(*error); fn(*error);
}); }, catcher.first);
} }
} }
protected: protected:
mutable QReadWriteLock m_lock;
void setSettled() void setSettled()
{ {
QWriteLocker lock(&m_lock);
Q_ASSERT(!m_settled); Q_ASSERT(!m_settled);
m_settled = true; m_settled = true;
} }
@ -377,18 +417,13 @@ private:
template <typename T> template <typename T>
class PromiseData: public PromiseDataBase<T> class PromiseData: public PromiseDataBase<T>
{ {
using Handler = std::pair<QPointer<QThread>, std::function<void(const T&)> >;
public: public:
using Handler = std::function<void(const T&)>; void addHandler(std::function<void(const T&)> handler)
void addHandler(Handler handler)
{ {
m_handlers.append(std::move(handler)); QWriteLocker lock(&this->m_lock);
} m_handlers.append({QThread::currentThread(), std::move(handler)});
const T& value() const
{
Q_ASSERT(!m_value.isNull());
return *m_value;
} }
void resolve(T&& value) void resolve(T&& value)
@ -407,13 +442,18 @@ public:
void notify() Q_DECL_OVERRIDE void notify() Q_DECL_OVERRIDE
{ {
Q_ASSERT(this->isFulfilled()); this->m_lock.lockForWrite();
QSharedPointer<T> value(m_value);
QVector<Handler> handlers(std::move(m_handlers)); QVector<Handler> handlers(std::move(m_handlers));
this->m_lock.unlock();
QSharedPointer<T> value(m_value);
Q_ASSERT(!value.isNull());
for (const auto& handler: handlers) { for (const auto& handler: handlers) {
const auto& fn = handler.second;
qtpromise_defer([=]() { qtpromise_defer([=]() {
handler(*value); fn(*value);
}); }, handler.first);
} }
} }
@ -425,12 +465,13 @@ private:
template <> template <>
class PromiseData<void>: public PromiseDataBase<void> class PromiseData<void>: public PromiseDataBase<void>
{ {
public: using Handler = std::pair<QPointer<QThread>, std::function<void()> >;
using Handler = std::function<void()>;
void addHandler(Handler handler) public:
void addHandler(std::function<void()> handler)
{ {
m_handlers.append(std::move(handler)); QWriteLocker lock(&m_lock);
m_handlers.append({QThread::currentThread(), std::move(handler)});
} }
void resolve() { setSettled(); } void resolve() { setSettled(); }
@ -438,12 +479,12 @@ public:
protected: protected:
void notify() Q_DECL_OVERRIDE void notify() Q_DECL_OVERRIDE
{ {
Q_ASSERT(isFulfilled()); this->m_lock.lockForWrite();
QVector<Handler> handlers(std::move(m_handlers)); QVector<Handler> handlers(std::move(m_handlers));
this->m_lock.unlock();
for (const auto& handler: handlers) { for (const auto& handler: handlers) {
qtpromise_defer([=]() { qtpromise_defer(handler.second, handler.first);
handler();
});
} }
} }

View File

@ -4,4 +4,5 @@ SUBDIRS += \
future \ future \
helpers \ helpers \
qpromise \ qpromise \
requirements requirements \
thread

View File

@ -1,4 +1,3 @@
QT += concurrent
TARGET = tst_benchmark TARGET = tst_benchmark
SOURCES += $$PWD/tst_benchmark.cpp SOURCES += $$PWD/tst_benchmark.cpp

View File

@ -2,7 +2,6 @@
#include <QtPromise> #include <QtPromise>
// Qt // Qt
#include <QtConcurrent>
#include <QtTest> #include <QtTest>
using namespace QtPromise; using namespace QtPromise;
@ -11,11 +10,11 @@ class tst_benchmark: public QObject
{ {
Q_OBJECT Q_OBJECT
private Q_SLOTS:
void valueResolve(); void valueResolve();
void valueReject(); void valueReject();
void valueThen(); void valueThen();
void errorReject(); void errorReject();
private Q_SLOTS:
void errorThen(); void errorThen();
}; // class tst_benchmark }; // class tst_benchmark

View File

@ -0,0 +1,5 @@
QT += concurrent
TARGET = tst_thread
SOURCES += $$PWD/tst_thread.cpp
include(../tests.pri)

View File

@ -0,0 +1,186 @@
// QtPromise
#include <QtPromise>
// Qt
#include <QtConcurrent>
#include <QtTest>
using namespace QtPromise;
class tst_thread: public QObject
{
Q_OBJECT
private Q_SLOTS:
void resolve();
void resolve_void();
void reject();
void then();
void then_void();
void fail();
void finally();
}; // class tst_thread
QTEST_MAIN(tst_thread)
#include "tst_thread.moc"
void tst_thread::resolve()
{
int value = -1;
size_t target = 0;
size_t source = 0;
QPromise<int>([&](const QPromiseResolve<int>& resolve) {
QtConcurrent::run([=, &source]() {
source = (size_t)QThread::currentThread();
resolve(42);
});
}).then([&](int res) {
target = (size_t)QThread::currentThread();
value = res;
}).wait();
QVERIFY(source != 0);
QVERIFY(source != target);
QCOMPARE(target, (size_t)QThread::currentThread());
QCOMPARE(value, 42);
}
void tst_thread::resolve_void()
{
int value = -1;
size_t target = 0;
size_t source = 0;
QPromise<void>([&](const QPromiseResolve<void>& resolve) {
QtConcurrent::run([=, &source]() {
source = (size_t)QThread::currentThread();
resolve();
});
}).then([&]() {
target = (size_t)QThread::currentThread();
value = 43;
}).wait();
QVERIFY(source != 0);
QVERIFY(source != target);
QCOMPARE(target, (size_t)QThread::currentThread());
QCOMPARE(value, 43);
}
void tst_thread::reject()
{
QString error;
size_t target = 0;
size_t source = 0;
QPromise<int>([&](const QPromiseResolve<int>&, const QPromiseReject<int>& reject) {
QtConcurrent::run([=, &source]() {
source = (size_t)QThread::currentThread();
reject(QString("foo"));
});
}).fail([&](const QString& err) {
target = (size_t)QThread::currentThread();
error = err;
return -1;
}).wait();
QVERIFY(source != 0);
QVERIFY(source != target);
QCOMPARE(target, (size_t)QThread::currentThread());
QCOMPARE(error, QString("foo"));
}
void tst_thread::then()
{
size_t source;
QPromise<int> p([&](const QPromiseResolve<int>& resolve) {
source = (size_t)QThread::currentThread();
resolve(42);
});
size_t target;
int value = -1;
qPromise(QtConcurrent::run([&](const QPromise<int>& p) {
p.then([&](int res) {
target = (size_t)QThread::currentThread();
value = res;
}).wait();
}, p)).wait();
QVERIFY(target != 0);
QVERIFY(source != target);
QCOMPARE(source, (size_t)QThread::currentThread());
QCOMPARE(value, 42);
}
void tst_thread::then_void()
{
size_t source;
QPromise<void> p([&](const QPromiseResolve<void>& resolve) {
source = (size_t)QThread::currentThread();
resolve();
});
size_t target;
int value = -1;
qPromise(QtConcurrent::run([&](const QPromise<void>& p) {
p.then([&]() {
target = (size_t)QThread::currentThread();
value = 43;
}).wait();
}, p)).wait();
QVERIFY(target != 0);
QVERIFY(source != target);
QCOMPARE(source, (size_t)QThread::currentThread());
QCOMPARE(value, 43);
}
void tst_thread::fail()
{
size_t source;
QPromise<int> p([&](const QPromiseResolve<int>&, const QPromiseReject<int>& reject) {
source = (size_t)QThread::currentThread();
reject(QString("foo"));
});
size_t target;
QString error;
qPromise(QtConcurrent::run([&](const QPromise<int>& p) {
p.fail([&](const QString& err) {
target = (size_t)QThread::currentThread();
error = err;
return -1;
}).wait();
}, p)).wait();
QVERIFY(target != 0);
QVERIFY(source != target);
QCOMPARE(source, (size_t)QThread::currentThread());
QCOMPARE(error, QString("foo"));
}
void tst_thread::finally()
{
size_t source;
QPromise<int> p([&](const QPromiseResolve<int>& resolve) {
source = (size_t)QThread::currentThread();
resolve(42);
});
size_t target;
int value = -1;
qPromise(QtConcurrent::run([&](const QPromise<int>& p) {
p.finally([&]() {
target = (size_t)QThread::currentThread();
value = 43;
}).wait();
}, p)).wait();
QVERIFY(target != 0);
QVERIFY(source != target);
QCOMPARE(source, (size_t)QThread::currentThread());
QCOMPARE(value, 43);
}