[libgee] Add futures and promises to libgee
- From: Maciej Marcin Piechotka <mpiechotka src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libgee] Add futures and promises to libgee
- Date: Thu, 18 Apr 2013 11:26:51 +0000 (UTC)
commit 6af1994ac0219878eadf159a02e714b2c0cf5baa
Author: Maciej Piechotka <uzytkownik2 gmail com>
Date: Thu Apr 18 13:24:21 2013 +0200
Add futures and promises to libgee
configure.ac | 2 +-
gee/Makefile.am | 6 ++
gee/flatmapfuture.vala | 141 ++++++++++++++++++++++++++++++++++++++++
gee/future.vala | 172 +++++++++++++++++++++++++++++++++++++++++++++++++
gee/mapfuture.vala | 142 ++++++++++++++++++++++++++++++++++++++++
gee/promise.vala | 123 +++++++++++++++++++++++++++++++++++
gee/task.vala | 93 ++++++++++++++++++++++++++
7 files changed, 678 insertions(+), 1 deletion(-)
---
diff --git a/configure.ac b/configure.ac
index 4ecc014..2e99749 100644
--- a/configure.ac
+++ b/configure.ac
@@ -51,7 +51,7 @@ AM_CONDITIONAL(ENABLE_BENCHMARK, test x$enable_benchmark = xyes)
AS_IF([test x$enable_benchmark = xyes],
[VALA_ADD_CHECKFILE([benchmark/benchmarks_vala.stamp])])
-PKG_CHECK_MODULES(GLIB, glib-2.0 >= $GLIB_REQUIRED gobject-2.0 >= $GLIB_REQUIRED)
+PKG_CHECK_MODULES(GLIB, glib-2.0 >= $GLIB_REQUIRED gobject-2.0 >= $GLIB_REQUIRED gio-2.0 >= $GLIB_REQUIRED)
AC_SUBST(GLIB_CFLAGS)
AC_SUBST(GLIB_LIBS)
diff --git a/gee/Makefile.am b/gee/Makefile.am
index a4295a5..115920f 100644
--- a/gee/Makefile.am
+++ b/gee/Makefile.am
@@ -32,6 +32,8 @@ libgee_0_8_la_SOURCES = \
concurrentset.vala \
deque.vala \
functions.vala \
+ future.vala \
+ flatmapfuture.vala \
hashable.vala \
hashmap.vala \
hashmultimap.vala \
@@ -45,10 +47,12 @@ libgee_0_8_la_SOURCES = \
list.vala \
listiterator.vala \
map.vala \
+ mapfuture.vala \
mapiterator.vala \
multimap.vala \
multiset.vala \
priorityqueue.vala \
+ promise.vala \
queue.vala \
readonlybidirlist.vala \
readonlybidirsortedset.vala \
@@ -64,6 +68,7 @@ libgee_0_8_la_SOURCES = \
set.vala \
sortedmap.vala \
sortedset.vala \
+ task.vala \
timsort.vala \
traversable.vala \
treemap.vala \
@@ -78,6 +83,7 @@ libgee_0_8_la_VALAFLAGS = \
-h gee-internals.h \
--internal-vapi gee-internals-0.8.vapi \
--library gee-0.8 --gir Gee-0.8.gir \
+ --pkg gio-2.0 \
$(COVERAGE_VALAFLAGS) \
$(VALAFLAGS) \
$(NULL)
diff --git a/gee/flatmapfuture.vala b/gee/flatmapfuture.vala
new file mode 100644
index 0000000..5df8149
--- /dev/null
+++ b/gee/flatmapfuture.vala
@@ -0,0 +1,141 @@
+/* flatmapfuture.vala
+ *
+ * Copyright (C) 2013 Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Author:
+ * Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+internal class Gee.FlatMapFuture<A, G> : Object, Future<A> {
+ public FlatMapFuture (Future<G> base_future, Future.FlatMapFunc<A, G> func) {
+ _base = base_future;
+ _func = func;
+ _base.when_done (() => {
+ _mutex.lock ();
+ if (_progress == Progress.INIT) {
+ go_map ();
+ } else {
+ _mutex.unlock ();
+ }
+ });
+ }
+
+ public bool ready {
+ get {
+ _mutex.lock ();
+ bool ready = _progress == Progress.READY;
+ _mutex.unlock ();
+ return ready && _mapped.ready;
+ }
+ }
+
+ public unowned G wait () {
+ unowned Future<A> ret_future;
+ _mutex.lock ();
+ switch (_progress) {
+ case Progress.INIT:
+ ret_future = go_map ();
+ break;
+ case Progress.PROGRESS:
+ _cond.wait (_mutex);
+ _mutex.unlock ();
+ ret_future = _mapped;
+ break;
+ case Progress.READY:
+ _mutex.unlock ();
+ ret_future = _mapped;
+ break;
+ default:
+ assert_not_reached ();
+ }
+ return ret_future.wait ();
+ }
+
+ public unowned bool wait_until (int64 end_time, out unowned G? value = null) {
+ bool ret_value;
+ _mutex.lock ();
+ switch (_progress) {
+ case Progress.INIT:
+ ret_value = go_map ().wait_until (end_time, out value);
+ break;
+ case Progress.PROGRESS:
+ if (ret_value = _cond.wait_until (_mutex, end_time)) {
+ _mutex.unlock ();
+ ret_value = _mapped.wait_until (end_time, out value);
+ } else {
+ _mutex.unlock ();
+ value = null;
+ }
+ break;
+ case Progress.READY:
+ _mutex.unlock ();
+ ret_value = _mapped.wait_until (end_time, out value);
+ break;
+ default:
+ assert_not_reached ();
+ }
+ return ret_value;
+ }
+
+ public void when_done (Future.WhenDoneFunc<A> func) {
+ _mutex.lock ();
+ if (_progress == Progress.READY) {
+ _mutex.unlock ();
+ _mapped.when_done (func);
+ } else {
+ _when_done += Future.WhenDoneArrayElement<A>(func);
+ _mutex.unlock ();
+ }
+ }
+
+ private unowned Future<A> go_map () {
+ _progress = Progress.PROGRESS;
+ _mutex.unlock ();
+
+ Future<A> mapped = _func (_base.value);
+ unowned Future<A> result = mapped;
+
+ _mutex.lock ();
+ _mapped = (owned)mapped;
+ _progress = Progress.READY;
+ _cond.broadcast ();
+ _mutex.unlock ();
+
+ Future.WhenDoneArrayElement<A>[] when_done = (owned)_when_done;
+ for (int i = 0; i < _when_done.length; i++) {
+ _mapped.when_done ((owned)when_done[i].func);
+ }
+ _base = null;
+ _func = null;
+ return result;
+ }
+
+ public enum Progress {
+ INIT,
+ PROGRESS,
+ READY
+ }
+
+ private Mutex _mutex = Mutex ();
+ private Cond _cond = Cond ();
+ private Future<G> _base;
+ private Future.FlatMapFunc<A, G> _func;
+ private Future<A> _mapped;
+ private Progress _progress = Progress.INIT;
+ private Future.WhenDoneArrayElement<A>[]? _when_done = new Future.WhenDoneArrayElement<A>[0];
+}
+
diff --git a/gee/future.vala b/gee/future.vala
new file mode 100644
index 0000000..098f40a
--- /dev/null
+++ b/gee/future.vala
@@ -0,0 +1,172 @@
+/* future.vala
+ *
+ * Copyright (C) 2013 Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Author:
+ * Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+using GLib;
+
+/**
+ * Future is a value which might not yet be computed - for example it is calculated
+ * in different thread or depends on I/O value.
+ *
+ * All methods can be called from many threads as part of interface.
+ *
+ * @see Promise
+ * @see Lazy
+ * @see task
+ * @see async_task
+ *
+ * Note: Statement that call does not block does not mean that it is lock-free.
+ * Internally the implementation is allowed to take mutex but it should guarantee
+ * that it is not for a long time (including blocking on anything else, I/O calls
+ * or callbacks).
+ */
+[GenericAccessors]
+public interface Gee.Future<G> : Object {
+ /**
+ * The value associated with Future. If value is not ready getting value
+ * will block until value is ready.
+ *
+ * Returned value is always the same and it is alive at least as long
+ * as the future.
+ */
+ public virtual new G value {
+ get {
+ return wait ();
+ }
+ }
+
+ /**
+ * Checks if value is ready. If it is calls to { link wait} and
+ * { link wait_until} will not block and value is returned immidiatly.
+ */
+ public abstract bool ready {get;}
+
+ /**
+ * Waits until the value is ready.
+ *
+ * @returns The { link value} associated with future
+ * @see ready
+ * @see wait_until
+ * @see wait_async
+ */
+ public abstract unowned G wait ();
+
+ /**
+ * Waits until the value is ready or deadline have passed.
+ *
+ * @param end_time The time when the wait should finish
+ * @param value The { link value} associated with future if the wait was successful
+ * @returns ``true`` if the value was ready within deadline or ``false`` otherwise
+ * @see ready
+ * @see wait
+ * @see wait_async
+ */
+ public abstract bool wait_until (int64 end_time, out unowned G? value = null);
+
+ /**
+ * Reschedules the callback until the { link value} is available.
+ *
+ * @returns The { link value} associated with future
+ * @see ready
+ * @see wait
+ * @see wait_until
+ */
+ public virtual async unowned G wait_async () {
+ unowned G? result = null;
+ bool looped = true;
+ RecMutex mutex = RecMutex();
+ mutex.lock ();
+ when_done ((value) => {
+ mutex.lock ();
+ bool looped_copy = looped;
+ mutex.unlock ();
+ result = value;
+ if (looped_copy) {
+ Idle.add (wait_async.callback);
+ } else {
+ wait_async.callback ();
+ }
+ });
+ looped = false;
+ mutex.unlock ();
+ yield;
+ return result;
+ }
+
+ [CCode (scope = "async")]
+ public delegate void WhenDoneFunc<G>(G value);
+
+ /**
+ * Registers a callback which is called once the future is { link ready}.
+ *
+ * Note: As usually the callbacks are called from thread finishing the
+ * future it is recommended to not include lengthly computation.
+ * If one is needed please use { link task}.
+ */
+ public abstract void when_done (WhenDoneFunc<G> func);
+
+ /**
+ * Maps a future value to another value by a function and returns the
+ * another value in future.
+ *
+ * @param func Function applied to { link value}
+ * @returns Value returned by function
+ *
+ * @see flatMap
+ *
+ * Note: As time taken by function does not contribute to
+ * { link wait_until} and the implementation is allowed to compute
+ * value eagerly by { link when_done} it is recommended to use
+ * { link task} and { link flat_map} for longer computation.
+ */
+ public virtual Future<A> map<A> (MapFunc<A, G> func) {
+ return new MapFuture<A, G> (this, func);
+ }
+
+ [CCode (scope = "async")]
+ public delegate Gee.Future<A> FlatMapFunc<A, G>(G value);
+
+ /**
+ * Maps a future value to another future value which is returned (call does not block).
+ *
+ * @param func Function applied to { link value}
+ * @param Value of a future returned by function
+ *
+ * @see map
+ *
+ * Note: As time taken by function does not contribute to
+ * { link wait_until} and the implementation is allowed to compute
+ * value eagerly by { link when_done} it is recommended to put the
+ * larger computation inside the returned future for example by
+ * { link task}
+ */
+ public virtual Gee.Future<A> flatMap<A>(FlatMapFunc<A, G> func) {
+ return new FlatMapFuture<A, G> (this, func);
+ }
+
+ internal struct WhenDoneArrayElement<G> {
+ public WhenDoneArrayElement (WhenDoneFunc<G> func) {
+ this.func = func;
+ }
+ public WhenDoneFunc<G> func;
+ }
+}
+
diff --git a/gee/mapfuture.vala b/gee/mapfuture.vala
new file mode 100644
index 0000000..bbafa54
--- /dev/null
+++ b/gee/mapfuture.vala
@@ -0,0 +1,142 @@
+/* mapfuture.vala
+ *
+ * Copyright (C) 2013 Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Author:
+ * Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+internal class Gee.MapFuture<A, G> : Object, Future<A> {
+ public MapFuture (Future<G> future_base, MapFunc<A, G> func) {
+ _base = future_base;
+ _func = func;
+ _base.when_done (() => {
+ _mutex.lock ();
+ if (_progress == Progress.INIT) {
+ go_map ();
+ } else {
+ _mutex.unlock ();
+ }
+ });
+ }
+
+ public bool ready {
+ get {
+ _mutex.lock ();
+ bool locked = _progress == Progress.READY;
+ _mutex.unlock ();
+ return locked;
+ }
+ }
+
+ public unowned A wait () {
+ unowned A ret_value;
+ _mutex.lock ();
+ switch (_progress) {
+ case Progress.INIT:
+ ret_value = go_map ();
+ break;
+ case Progress.PROGRESS:
+ _cond.wait (_mutex);
+ _mutex.unlock ();
+ ret_value = _value;
+ break;
+ case Progress.READY:
+ _mutex.unlock ();
+ ret_value = _value;
+ break;
+ default:
+ assert_not_reached ();
+ }
+ return ret_value;
+ }
+
+ public unowned bool wait_until (int64 end_time, out unowned G? value = null) {
+ bool ret_value;
+ _mutex.lock ();
+ switch (_progress) {
+ case Progress.INIT:
+ value = go_map ();
+ ret_value = true;
+ break;
+ case Progress.PROGRESS:
+ if (ret_value = _cond.wait_until (_mutex, end_time)) {
+ _mutex.unlock ();
+ value = _value;
+ } else {
+ _mutex.unlock ();
+ }
+ break;
+ case Progress.READY:
+ _mutex.unlock ();
+ value = _value;
+ ret_value = true;
+ break;
+ default:
+ assert_not_reached ();
+ }
+ return ret_value;
+ }
+
+ public void when_done (Future.WhenDoneFunc<A> func) {
+ _mutex.lock ();
+ if (_progress == Progress.READY) {
+ _mutex.unlock ();
+ func (_value);
+ } else {
+ _when_done += Future.WhenDoneArrayElement<G>(func);
+ _mutex.unlock ();
+ }
+ }
+
+ private unowned A go_map () {
+ _progress = Progress.PROGRESS;
+ _mutex.unlock ();
+
+ A tmp_value = _func (_base.value);
+ unowned A value = tmp_value;
+
+ _mutex.lock ();
+ _value = (owned)tmp_value;
+ _progress = Progress.READY;
+ _cond.broadcast ();
+ _mutex.unlock ();
+
+ Future.WhenDoneArrayElement<G>[] when_done = (owned)_when_done;
+ for (int i = 0; i < when_done.length; i++) {
+ when_done[i].func (value);
+ }
+ _base = null;
+ _func = null;
+ return value;
+ }
+
+ private enum Progress {
+ INIT,
+ PROGRESS,
+ READY
+ }
+
+ private Mutex _mutex = Mutex ();
+ private Cond _cond = Cond ();
+ private Future<G> _base;
+ private MapFunc<A, G> _func;
+ private A _value;
+ private Progress _progress = Progress.INIT;
+ private Future.WhenDoneArrayElement<G>[]? _when_done = new Future.WhenDoneArrayElement<G>[0];
+}
+
diff --git a/gee/promise.vala b/gee/promise.vala
new file mode 100644
index 0000000..806e7d8
--- /dev/null
+++ b/gee/promise.vala
@@ -0,0 +1,123 @@
+/* promise.vala
+ *
+ * Copyright (C) 2013 Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Author:
+ * Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+using GLib;
+
+/**
+ * Promise allows to set a value with associated { link Future}. Please note that
+ * value can be stored only once.
+ *
+ * Typically the producer will create promise and return { link future} while
+ * keeping the promise to itself. Then when value is ready it can call { link set_value}.
+ */
+public class Gee.Promise<G> {
+ /**
+ * { link Future} value of this promise
+ */
+ public Gee.Future<G> future {
+ get {
+ return _future;
+ }
+ }
+
+ /**
+ * Sets the value of the future.
+ *
+ * @params value Value of future
+ */
+ public void set_value (owned G value) {
+ _future.set_value (value);
+ }
+
+ private class Future<G> : Object, Gee.Future<G> {
+ public bool ready {
+ get {
+ _mutex.lock();
+ bool result = _ready;
+ _mutex.unlock();
+ return result;
+ }
+ }
+
+ public unowned G wait () {
+ _mutex.lock();
+ if (!_ready) {
+ _set.wait (_mutex);
+ }
+ _mutex.unlock();
+ return _value;
+ }
+
+ public bool wait_until (int64 end_time, out unowned G? value = null) {
+ bool result = true;
+ _mutex.lock();
+ if (!_ready) {
+ if (!_set.wait_until (_mutex, end_time)) {
+ result = false;
+ }
+ }
+ _mutex.unlock();
+ if (result) {
+ value = _value;
+ } else {
+ value = null;
+ }
+ return result;
+ }
+
+ public void when_done (Gee.Future.WhenDoneFunc<G> func) {
+ _mutex.lock ();
+ if (_ready) {
+ _mutex.unlock ();
+ func (_value);
+ } else {
+ _when_done += Gee.Future.WhenDoneArrayElement<G>(func);
+ _mutex.unlock ();
+ }
+ }
+
+ internal void set_value (owned G value) {
+ unowned G value_copy = value;
+
+ _mutex.lock ();
+ assert (!_ready);
+ _value = (owned)value;
+ _ready = true;
+ _set.broadcast ();
+ _mutex.unlock ();
+
+ Gee.Future.WhenDoneArrayElement<G>[] when_done = (owned)_when_done;
+ for (int i = 0; i < when_done.length; i++) {
+ when_done[i].func (value_copy);
+ }
+ }
+
+ private Mutex _mutex = Mutex ();
+ private Cond _set = Cond ();
+ private G _value;
+ private bool _ready;
+ private Gee.Future.WhenDoneArrayElement<G>[]? _when_done = new
Gee.Future.WhenDoneArrayElement<G>[0];
+ }
+
+ private Future<G> _future = new Future<G>();
+}
+
diff --git a/gee/task.vala b/gee/task.vala
new file mode 100644
index 0000000..bbcd769
--- /dev/null
+++ b/gee/task.vala
@@ -0,0 +1,93 @@
+/* task.vala
+ *
+ * Copyright (C) 2013 Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Author:
+ * Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+namespace Gee {
+ public delegate G Task<G>();
+
+ /**
+ * Schedules a task to execute asynchroniously. Internally one
+ * of threads from pool will execute the task.
+ *
+ * @params task Task to be executed
+ * @returns Future value returned by task
+ * @see async_task
+ *
+ * Note: There is limited number of threads unless environment variable
+ * ``GEE_NUM_THREADS`` is set to -1. It is not adviced to call I/O or
+ * block inside the taks. If necessary it is possible to create a new one
+ * by anyther call.
+ */
+ public Future<G> task<G>(Task<G> task) throws GLib.ThreadError {
+ TaskData<G> tdata = new TaskData<G>();
+ tdata.function = task;
+ tdata.promise = new Promise<G>();
+ Future<G> result = tdata.promise.future;
+ TaskData.get_async_pool ().add ((owned)tdata);
+ return result;
+ }
+
+ /**
+ * Continues the execution asynchroniously in helper thread. Internally
+ * one of threads from pool will execute the task.
+ *
+ * @see task
+ *
+ * Note: There is limited number of threads unless environment variable
+ * ``GEE_NUM_THREADS`` is set to -1. It is not adviced to call I/O or
+ * block inside the taks. If necessary it is possible to create a new one
+ * by anyther call.
+ */
+ public async void async_task() throws GLib.ThreadError {
+ task<bool>(async_task.callback);
+ }
+
+ [Compact]
+ internal class TaskData<G> {
+ [CCode (scope = "async")]
+ public Task<G> function;
+ public Promise<G> promise;
+ public void run() {
+ promise.set_value(function());
+ }
+ private static GLib.Once<ThreadPool<TaskData>> async_pool;
+ internal static unowned ThreadPool<TaskData> get_async_pool () {
+ return async_pool.once(() => {
+ int num_threads = (int)GLib.get_num_processors ();
+ string? gee_num_threads_str = Environment.get_variable("GEE_NUM_THREADS");
+ if (gee_num_threads_str != null) {
+ int64 result;
+ if (int64.try_parse (gee_num_threads_str, out result)) {
+ num_threads = (int)result;
+ }
+ }
+ try {
+ return new ThreadPool<TaskData>.with_owned_data((tdata) => {
+ tdata.run();
+ }, num_threads /* FIXME: Use environment variable*/, false);
+ } catch (ThreadError err) {
+ Process.abort ();
+ }
+ });
+ }
+ }
+}
+
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]