hmm figured out that it were due to 20000 lines of ttl code being hardly parsed by jena somehow. 

it were when e.g. used 4000 lines ttl data it can parse much more fastly. 



hmm so experience is that: don't use 20k alike lines when using jena to parse ttl.

it does not work any linearly with respect to row count there is some nonlinear condition and thereby it just would take as if task never completes. 

hmm now i had setup a 1 task node having cluster and now it seems as processed alot partitions already very fastly. so I had to do some optimization alike: to reduce row sorting optimization of:

e.g. this were used to have sorted text from each partition in storage:

val grouped_df = (
df
.groupBy("partitionId")
.agg(
sort_array(collect_list(struct("rowId", "value")))
.alias("collected_list")
)
.withColumn("data", concat_ws("\n", col("collected_list.value")))
.drop("collected_list")
)


Then this would very slow down. and so i moved to such method:

 var df = spark.read.parquet(s"s3://ontology-******/table_v10_test/raw_table/partitionId=${BigInt(a)*BigInt(2)}0000")
     df.select("value").rdd.map(x=>s"${x.getString(0)}").saveAsTextFile(s"s3://ontology-**********/table_v106_text/raw_table_text/partitionId=${BigInt(a)*BigInt(2)}0000/text_data.txt")
     
     
    var textData = spark.read.option("lineSep", ",,,,,,,,,,,,,,,").text(s"s3://ontology-************/table_v106_text/raw_table_text/partitionId=${BigInt(a)*BigInt(2)}0000/text_data.txt").takeAsList(1).toList.head.getString(0).split("\n").toArray


then here I also needed to partition the textdata to 4000 rows of 5 parts and then called jena parse 
currently iteratively and it worked very fast. 

I think i would also add some tumbling window methodology also.
but seems as finally this ttl table is going to be resolved.
it seems as main proiblems were:

first of all sorting were taking time (it were necessary to get correct ordered records) 
plus  it were also figured out that 20k lines is too much for apache jena to process.
its like it does not have O(n) type implementation alike it just exponentially becomes inefficient as data is in such regimen.

so now it takes blazing fast with this methods yepp. even at some point it goes nonsparky and it follows like doing at same task executor process having 5 threads doing separate spark jobs. plus Its sequential processing. but its much faster than sparky method currently since somehow either this zeppelin has some spark integration problem or so.
nevertheless iterative sequential processing is fast enough to process all data so that does not matter alot.
i mean solution were not fully distributed but mostly sequential processing of this partitioned data somehow. 
since if spark is used, its necessary to convert rows to text and that always needs sorting and that takes unnecessary long time and unnecessary complexity.
plus the text writing parts could be also processed by a spark job later if wanted to enforce distributed processing but there is no such concern what matters is to generate the ttl table whether generated fully distributed or not(which is the case).

I dont want to spend more time to this task and would rather run it in 1 task node spark configuration with such nondistributed logic at this stage.
there afterwards i can switch to usiong again spark's distributed magic later. but would rather keep this machine running this task till morning and guess by checking its velocity, its 

aha checked velocity of and decided to move to distributed logic. but with these additional methodologies. 
hmm yepp. 
there is just too much data. i need to convert this again to distributed logic and spawn in a cluster. 
but this time with no sorting or so.
unnice topic is that it did not read the partitionid column thereby i would need to recreate such column. as a secondary column. hmm 
then would distribute partitioned wise but with some logic like:

something like: since it did not recognized partitioning folder structure :S (normally does smehow did not in this setup)
-> 
for ( index <- 1 to partitionCount) {
 var df = spark.read.parquet(s"s3://ontology-****/table_v10_test/raw_table/partitionId=${BigInt(a)*BigInt(2)}0000")
    -----
do compute/save in a scala future thread (e.g. 5 threads processing 4k lines data)
-----
then await completion of that just only for 60 seconds then timeout and continue loop
}

currently i dont timeoput and do iteratively process in this single node cluster.
but i would then arrange a controlled partition tasks assignment to cluster with
checking how many unfinished applications are there running or so to controllingly add new spark task in this loop.

is it possible to get spark yarn application count from spark in zeppelin i wonder

hmm  i just need to keep e.g. if i spawn a 50 nodes cluster, e.g. having e.g. like 200 applications at most running at single time.
so above loop can not continuosly add partition tasks but needs to be controlled e.g. add 200 applications spark tasks (with fair scheduler)
then await for running applications to go below 200 and keeping it at 200 usually.

so this would assure that there is always 200 partitions processed at most and cluster is not thrown lots of tasks to. 
hmm i guess this would make it like for 1000 billion partitions: if 10million partitions processing takes 1 hour or so,
it would be like 50 hours for 50 nodes which would be like: 250 euros or so expense.

yepp lets do this methodology then. ok.

-----------------------------------------------------------------------

but then also i would merge the partitions e.g. there would be %1 or %2 of current partitions. 

but then i also would check the avail partitioning mechanisms e.g.
checking more ontology structure to define other than partitionid what subject or predicate or object based partition groupings.
i mean a more semantically clustered data. based on its expected query patterns for later.
but this task I move to tomorrow and now would continue with machine learning tasks and topology review.
i would continue this ontology db tasks tomorrow. with running the task tomorrow with 50 nodes cluster with such methodology. 
to expect 50 hours nearly 2 days running time to finalize initial ontology table in ttl structure with yet undecided partitioning format.
i would restudy to ontology info later tomorrow or other day to decide how to do that.

yayy then now i stop this ontology db study and continue from setting up machine learning ec2 instance with pytorch.
I would need to start cehcking the nlp methodologies e.g. like some pytorch neural net implementation, or spacy like tools. 
to decide on the toolings to extract main dependency graph between sentences of paragraphs. 
and then we would need to have it be possible to reach out to other paragraphs right? 
that would be very interesting design problem to be solved.
it could be like that part of initial paragraphs main parts might be taken sentences of to merge the second paragraph to investigate dependency mapping of read second paragraph.
i mean when neural net devices the dependency graph for paragraph 1 all, nbice, but what aboiut paragraph 2 that does not fit to neural net's paragraph capacity.
so to create correlations of dependency graphs of paragraphs continuosly some ingenius methods i need to devise that would not be best methods either.
i would both check the ontological relatedness of possibility of linkages of dependency graphs and also would merge parts of paragraph to the original paragraph to keep up search dependency integrations correctness.

so it would be fun.
hmm so rest of today: continues wiht setting up ec2 instance with pytorch and such libraries, 
then running the dependency graph extractor with paragraphs to check it 
then continuing topology study and might need to fastly introduce myself to category theory also.

hmm nice :) 

Yorumlar

Bu blogdaki popüler yayınlar