[gxml] StreamReader: implement read_buffer_async()
- From: Daniel Espinosa Ortiz <despinosa src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gxml] StreamReader: implement read_buffer_async()
- Date: Tue, 30 Jul 2019 19:08:24 +0000 (UTC)
commit a4038f05c0bbcbc5a7d7597e4511efdfe840fd4f
Author: Daniel Espinosa <esodan gmail com>
Date: Tue Jul 30 14:04:09 2019 -0500
StreamReader: implement read_buffer_async()
Fix issue #27
This method creates a new ThreadPool and
execute parsing for each node in a new
thread to read_from_string() the node using
the read_buffer string cache.
gxml/Element.vala | 42 +++++++++++++++++--
gxml/StreamReader.vala | 3 --
...reamReaderPerformanceAsyncReadUnparsedTest.vala | 49 ++++++++++------------
...amReaderPerformanceIterateReadUnparsedTest.vala | 14 +------
test/StreamReaderTest.vala | 30 ++++++++++++-
5 files changed, 91 insertions(+), 47 deletions(-)
---
diff --git a/gxml/Element.vala b/gxml/Element.vala
index 8782a9a..5e0aff2 100644
--- a/gxml/Element.vala
+++ b/gxml/Element.vala
@@ -809,20 +809,56 @@ public class GXml.Element : GXml.Node,
if (read_buffer == null) {
return;
}
- read_from_string ((string) read_buffer.data);
- read_buffer = null;
foreach (DomNode n in child_nodes) {
if (n is GXml.Element) {
((GXml.Element) n).parse_buffer ();
}
}
+ read_from_string ((string) read_buffer.data);
+ read_buffer = null;
+ }
+
+ ThreadPool<Element> pool = null;
+
+ /**
+ * Monitor multi-threading parsing
+ */
+ public uint parse_pending () {
+ if (pool == null) {
+ return 0;
+ }
+ return pool.unprocessed ();
}
/**
* Asynchronically parse {@link read_buffer}
*/
public async void parse_buffer_async () throws GLib.Error {
- parse_buffer ();
+ if (read_buffer == null) {
+ return;
+ }
+ uint nth = GLib.get_num_processors ();
+ if (nth > 1) {
+ nth = (uint) (nth - 1);
+ }
+ pool = new ThreadPool<Element>.with_owned_data ((e)=>{
+ try {
+ if (e.read_buffer != null) {
+ e.read_from_string ((string) e.read_buffer.data);
+ e.read_buffer = null;
+ }
+ } catch (GLib.Error err) {
+ warning (_("Error parsing child's buffer: %s"), err.message);
+ }
+ }, (int) nth, false);
+ foreach (DomNode n in child_nodes) {
+ if (n is GXml.Element) {
+ pool.add ((GXml.Element) n);
+ }
+ }
+ //while (lpool.unprocessed () != 0);
+ read_from_string ((string) read_buffer.data);
+ read_buffer = null;
}
}
diff --git a/gxml/StreamReader.vala b/gxml/StreamReader.vala
index c7283f2..127ffda 100644
--- a/gxml/StreamReader.vala
+++ b/gxml/StreamReader.vala
@@ -220,7 +220,6 @@ public class GXml.StreamReader : GLib.Object {
}
} else if (children && parent == null) {
GXml.Element ce = read_element (false, e);;
- message ("Parsing node: %s", ce.local_name);
var col = root_collections.get (ce.local_name.down ());
if (col != null) {
var cobj = GLib.Object.new (col.items_type,
@@ -228,9 +227,7 @@ public class GXml.StreamReader : GLib.Object {
cobj.read_buffer = ce.read_buffer;
e.append_child (cobj);
col.append (cobj);
- message ("Added node: %s to %s", cobj.local_name, col.get_type ().name ());
} else {
- message ("Searching node property");
foreach (ParamSpec pspec in
(e as GXml.Object).get_property_element_list ()) {
if (pspec.value_type.is_a (typeof (Collection))) continue;
diff --git a/test/StreamReaderPerformanceAsyncReadUnparsedTest.vala
b/test/StreamReaderPerformanceAsyncReadUnparsedTest.vala
index c84e2ad..b06cf7a 100644
--- a/test/StreamReaderPerformanceAsyncReadUnparsedTest.vala
+++ b/test/StreamReaderPerformanceAsyncReadUnparsedTest.vala
@@ -28,43 +28,38 @@ class GXmlTest.Suite : GLib.Object
GLib.Intl.setlocale (GLib.LocaleCategory.ALL, "");
Test.init (ref args);
Test.add_func ("/gxml/stream-reader/performance", () => {
+ File dir = File.new_for_path (GXmlTestConfig.TEST_DIR);
+ assert (dir.query_exists ());
+ File f = File.new_for_uri (dir.get_uri ()+"/test-large.xml");
+ assert (f.query_exists ());
var loop = new MainLoop (null);
- var timer = new Timer ();
- ulong time = 0;
try {
- File dir = File.new_for_path (GXmlTestConfig.TEST_DIR);
- assert (dir.query_exists ());
- File f = File.new_for_uri (dir.get_uri ()+"/test-large.xml");
- assert (f.query_exists ());
var sr = new GXml.StreamReader (f.read ());
+ Test.timer_start ();
var d = sr.read ();
- timer.elapsed (out time);
- message ("Initial Parse: %lu ms for %d nodes", time / 1000, d.document_element.child_nodes.length);
- Timeout.add_full (0, 10, ()=>{
- int l = d.document_element.child_nodes.item (5000).child_nodes.length;
- if (l == 0) {
+ message ("Initial Parse: %g sec for %d nodes", Test.timer_elapsed (),
d.document_element.child_nodes.length);
+ Test.timer_start ();
+ Idle.add (()=>{
+ (d.document_element as GXml.Element).parse_buffer_async.begin ((obj, res)=>{
+ try {
+ (d.document_element as GXml.Element).parse_buffer_async.end (res);
+ } catch (GLib.Error e) {
+ warning ("Error: %s", e.message);
+ }
+ });
+ if (d.document_element.child_nodes.item (10079).child_nodes.length == 0) {
return Source.CONTINUE;
}
- try {
- message ((d.document_element.child_nodes.item (5000) as DomElement).write_string ());
- } catch (GLib.Error e) {
- warning ("Error: %s", e.message);
+ if ((d.document_element as GXml.Element).parse_pending () != 0) {
+ return Source.CONTINUE;
}
+ message ("Pending to parse: %u", (d.document_element as GXml.Element).parse_pending ());
+ message ("Parsed buffers: %g sec", Test.timer_elapsed ());
+ assert (d.document_element.child_nodes.item (10079) is GXml.Element);
+ assert (d.document_element.child_nodes.item (10079).child_nodes.length != 0);
loop.quit ();
return Source.REMOVE;
});
- Idle.add (()=>{
- (d.document_element as GXml.Element).parse_buffer_async.begin ((obj, res)=>{
- try {
- (d.document_element as GXml.Element).parse_buffer_async.end (res);
- } catch (GLib.Error e) {
- warning ("Error: %s", e.message);
- }
- timer.elapsed (out time);
- message ("Parse root: %lu ms", time / 1000);
- });
- return Source.REMOVE;
- });
} catch (GLib.Error e) {
warning ("Error: %s", e.message);
}
diff --git a/test/StreamReaderPerformanceIterateReadUnparsedTest.vala
b/test/StreamReaderPerformanceIterateReadUnparsedTest.vala
index 299afb2..1613991 100644
--- a/test/StreamReaderPerformanceIterateReadUnparsedTest.vala
+++ b/test/StreamReaderPerformanceIterateReadUnparsedTest.vala
@@ -40,24 +40,12 @@ class GXmlTest.Suite : GLib.Object
var d = sr.read ();
timer.elapsed (out time);
message ("Initial Parse: %lu ms for %d nodes", time / 1000, d.document_element.child_nodes.length);
- Timeout.add_full (0, 10, ()=>{
- int l = d.document_element.child_nodes.item (5000).child_nodes.length;
- if (l == 0) {
- return Source.CONTINUE;
- }
- try {
- message ((d.document_element.child_nodes.item (5000) as DomElement).write_string ());
- } catch (GLib.Error e) {
- warning ("Error: %s", e.message);
- }
- loop.quit ();
- return Source.REMOVE;
- });
Idle.add (()=>{
try {
(d.document_element as GXml.Element).parse_buffer ();
timer.elapsed (out time);
message ("Parse root: %lu ms", time / 1000);
+ loop.quit ();
} catch (GLib.Error e) {
warning ("Error: %s", e.message);
assert_not_reached ();
diff --git a/test/StreamReaderTest.vala b/test/StreamReaderTest.vala
index 1e5c716..cdd9513 100644
--- a/test/StreamReaderTest.vala
+++ b/test/StreamReaderTest.vala
@@ -205,7 +205,7 @@ class GXmlTest {
warning ("Error: %s", e.message);
}
});
- Test.add_func ("/gxml/stream-reader/child-multiple/read-unparsed", () => {
+ Test.add_func ("/gxml/stream-reader/child-multiple/read-unparsed/sync", () => {
var loop = new GLib.MainLoop (null);
Idle.add (()=>{
string str = """<root p1="a" p2="b" ><child k="p" y="9"><code/><code
u="3">TestC</code><Tek/><Tex y="456"/></child></root>""";
@@ -222,6 +222,34 @@ class GXmlTest {
}
return Source.REMOVE;
});
+ loop.run ();
+ });
+ Test.add_func ("/gxml/stream-reader/child-multiple/read-unparsed/async", () => {
+ var loop = new GLib.MainLoop (null);
+ Idle.add (()=>{
+ string str = """<root p1="a" p2="b" ><child k="p" y="9"><code/><code
u="3">TestC</code><Tek/><Tex y="456"/></child></root>""";
+ var istream = new MemoryInputStream.from_data (str.data, null);
+ var sr = new StreamReader (istream);
+ try {
+ var doc = sr.read ();
+ (doc.document_element as GXml.Element).parse_buffer_async.begin
((obj, res)=>{
+ try {
+ (doc.document_element as
GXml.Element).parse_buffer_async.end (res);
+ message (doc.write_string ());
+ assert ((doc.document_element as
GXml.Element).read_buffer == null);
+ } catch (GLib.Error e) {
+ warning ("Error while reading stream: %s", e.message);
+ }
+ });
+ if ((doc.document_element as GXml.Element).parse_pending () != 0) {
+ return Source.CONTINUE;
+ }
+ loop.quit ();
+ } catch (GLib.Error e) {
+ warning ("Error while reading stream: %s", e.message);
+ }
+ return Source.REMOVE;
+ });
loop.run ();
});
Test.add_func ("/gxml/stream-reader/serialization", () => {
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]