Wednesday 15 February 2012

json - Why multithreaded memory allocate/deallocate intensive application does not scale with number of threads? -



json - Why multithreaded memory allocate/deallocate intensive application does not scale with number of threads? -

notice:

original post title

why multithreaded json parser dwscript not scale number of threads?

was changed because problem not related processing json info dwscript. problem in default memory manager in delphi xe2 xe7 ( tested xe2 , trial xe7 ), problem appeared first in such type of application.

i have multithreaded win32/win64 vcl application process json info in delphi xe2.

each thread parses json info using tdwsjsonvalue.parsestring(sjson) dwscript, reads values using dwscript methods , stores result records.

for testing purposes process same json info in each thread.

single thead run takes n seconds within thread process data. increasing number of threads m lineary (approx. m * n) increases time within single thread necessary process same data.

in result there no speed improvment. other parts of applications ( json info delivery, storing results in target environment ) - scale expected.

what reason ? ideas appreciated.

supplemental information:

tested on win7/32 , win7/64, win8/64 2-core 12-core (w/w-out ht) systems

dwscript choosen fastest available (tested bunch, among them: superobject, build-in delphi). behaves similar json unit dws.

below finish console app illustrating problem. run need sample json info available here: https://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 file contains info json1.dat first thread. threads 16 re-create json1.dat json2.dat...json16.dat.

program , info shoule in same folder. run: convert.exe n, n number of threads.

program writes time of execution in msecs stout - spent in thread, time of parsing info , time of releasing (destroy) tdwsjsonvalue object. statement _dwsjvdata.destroy; not scale.

program convert; {$apptype console} {$r *.res} uses system.sysutils, system.diagnostics, system.classes, dwsjson in 'dwsjson.pas', dwsstrings in 'dwsstrings.pas', dwsutils in 'dwsutils.pas', dwsxplatform in 'dwsxplatform.pas'; type tworkerthread = class (tthread) private _iuid: integer; _swwatch: tstopwatch; _lrunning: boolean; _sfilejsondata: string; _fjsondata: textfile; protected constructor create (auid: integer); procedure execute; override; published property running: boolean read _lrunning; end; tconverter = class (tobject) private _swwatch0, _swwatch1, _swwatch2: tstopwatch; _dwsjvdata: tdwsjsonvalue; protected constructor create; destructor destroy; override; function calculate (auid: integer; ajsondata: string; var aparse, adestroy: integer): integer; end; const max_threads = 16; var ihowmany: integer; athworker: array [1..max_threads] of pointer; aielapsed: array [1..max_threads] of integer; aielapsedparse: array [1..max_threads] of integer; aielapseddestroy: array [1..max_threads] of integer; aifares: array [1..max_threads] of integer; swwatcht, swwatchp: tstopwatch; constructor tworkerthread.create (auid: integer); begin inherited create (true); _iuid := auid; _swwatch := tstopwatch.create; _sfilejsondata := extractfilepath (paramstr (0)) + 'json' + trim (inttostr (_iuid)) + '.dat'; _lrunning := false; suspended := false; end; procedure tworkerthread.execute; var j: integer; sline: string; sllines: tstringlist; os: tconverter; begin _lrunning := true; os := tconverter.create; sllines := tstringlist.create; system.assignfile (_fjsondata, _sfilejsondata); system.reset (_fjsondata); j := 0; repeat system.readln (_fjsondata, sline); sllines.add (sline); inc (j); until (j = 50); // until (system.eof (_fjsondata)); system.close (_fjsondata); sleep (1000); _swwatch.reset; _swwatch.start; aifares [_iuid] := 0; aielapsedparse [_iuid] := 0; aielapseddestroy [_iuid] := 0; j := 1 sllines.count aifares [_iuid] := aifares [_iuid] + os.calculate (_iuid, sllines.strings [j - 1], aielapsedparse [_iuid], aielapseddestroy [_iuid]); _swwatch.stop; sllines.free; os.destroy; aielapsed [_iuid] := _swwatch.elapsedmilliseconds; _lrunning := false; end; constructor tconverter.create; begin inherited create; _swwatch0 := tstopwatch.create; _swwatch1 := tstopwatch.create; _swwatch2 := tstopwatch.create; end; destructor tconverter.destroy; begin inherited; end; function tconverter.calculate (auid: integer; ajsondata: string; var aparse, adestroy: integer): integer; var jfare, jtotalfares, ielapsedparse, ielapseddestroy, ielapsedtotal: integer; begin _swwatch0.reset; _swwatch0.start; _swwatch1.reset; _swwatch1.start; _dwsjvdata := tdwsjsonvalue.parsestring (ajsondata); _swwatch1.stop; ielapsedparse := _swwatch1.elapsedmilliseconds; if (_dwsjvdata.valuetype = jvtarray) begin _swwatch2.reset; _swwatch2.start; jtotalfares := _dwsjvdata.elementcount; jfare := 0 (jtotalfares - 1) if (_dwsjvdata.elements [jfare].valuetype = jvtobject) begin _swwatch1.reset; _swwatch1.start; _swwatch1.stop; end; end; _swwatch1.reset; _swwatch1.start; _dwsjvdata.destroy; _swwatch1.stop; ielapseddestroy := _swwatch1.elapsedmilliseconds; _swwatch0.stop; ielapsedtotal := _swwatch0.elapsedmilliseconds; inc (aparse, ielapsedparse); inc (adestroy, ielapseddestroy); result := jtotalfares; end; procedure multithreadstart; var j: integer; begin j := 1 ihowmany if (athworker [j] = nil) begin athworker [j] := tworkerthread.create (j); tworkerthread (athworker [j]).freeonterminate := false; tworkerthread (athworker [j]).priority := tpnormal; end; end; procedure multithreadstop; var j: integer; begin j := 1 max_threads if (athworker [j] <> nil) begin tworkerthread (athworker [j]).terminate; tworkerthread (athworker [j]).waitfor; tworkerthread (athworker [j]).free; athworker [j] := nil; end; end; procedure prologue; var j: integer; begin ihowmany := strtoint (paramstr (1)); j := 1 max_threads athworker [j] := nil; swwatcht := tstopwatch.create; swwatcht.reset; swwatchp := tstopwatch.create; swwatchp.reset; end; procedure runconvert; function __isrunning: boolean; var j: integer; begin result := false; j := 1 max_threads result := result or ((athworker [j] <> nil) , tworkerthread (athworker [j]).running); end; begin swwatcht.start; multithreadstart; sleep (1000); while (__isrunning) sleep (500); multithreadstop; swwatcht.stop; writeln (#13#10, 'total time:', swwatcht.elapsedmilliseconds); end; procedure epilogue; var j: integer; begin j := 1 ihowmany writeln ( #13#10, 'thread # ', j, ' tot.time:', aielapsed [j], ' fares:', aifares [j], ' tot.parse:', aielapsedparse [j], ' tot.destroy:', aielapseddestroy [j]); readln; end; begin seek prologue; runconvert; epilogue; except on e: exception writeln (e.classname, ': ', e.message); end; end.

have tried scaleable memory manager? because delphi (with fastmm internally) not scale strings , other memory related stuff: https://scalemm.googlecode.com/files/scalemm_v2_4_1.zip

and seek both profiler modes of profiler see part bottleneck: https://code.google.com/p/asmprofiler/

json multithreading delphi dwscript

No comments:

Post a Comment