This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python

Overview

PyJava

This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python. PyJava introduces Apache Arrow as the exchanging data format, this means we can avoid ser/der between Java/Scala and Python which can really speed up the communication efficiency than traditional way.

When you invoke python code in Java/Scala side, PyJava will start some python workers automatically and send the data to python worker, and once they are processed, send them back. The python workers are reused
by default.

The initial code in this lib is from Apache Spark.

Install

Setup python(>= 3.6) Env(Conda is recommended):

pip uninstall pyjava && pip install pyjava

Setup Java env(Maven is recommended):

For Scala 2.11/Spark 2.4.3

<dependency>
    <groupId>tech.mlsqlgroupId>
    <artifactId>pyjava-2.4_2.11artifactId>
    <version>0.3.2version>
dependency>

For Scala 2.12/Spark 3.1.1

<dependency>
    <groupId>tech.mlsqlgroupId>
    <artifactId>pyjava-3.0_2.12artifactId>
    <version>0.3.2version>
dependency>

Build Mannually

Install Build Tool:

pip install mlsql_plugin_tool

Build for Spark 3.1.1:

mlsql_plugin_tool spark311
mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

Build For Spark 2.4.3

mlsql_plugin_tool spark243
mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

Using python code snippet to process data in Java/Scala

With pyjava, you can run any python code in your Java/Scala application.

sourceEnconder.toRow(irow).copy() }.iterator // run the code and get the return result val javaConext = new JavaContext val commonTaskContext = new AppContextImpl(javaConext, batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) //f.copy(), copy function is required columnarBatchIter.flatMap { batch => batch.rowIterator.asScala }.foreach(f => println(f.copy())) javaConext.markComplete javaConext.close ">
val envs = new util.HashMap[String, String]()
// prepare python environment
envs.put(str(PythonConf.PYTHON_ENV), "source activate dev && export ARROW_PRE_0_15_IPC_FORMAT=1 ")

// describe the data which will be transfered to python 
val sourceSchema = StructType(Seq(StructField("value", StringType)))

val batch = new ArrowPythonRunner(
  Seq(ChainedPythonFunctions(Seq(PythonFunction(
    """
      |import pandas as pd
      |import numpy as np
      |
      |def process():
      |    for item in context.fetch_once_as_rows():
      |        item["value1"] = item["value"] + "_suffix"
      |        yield item
      |
      |context.build_result(process())
    """.stripMargin, envs, "python", "3.6")))), sourceSchema,
  "GMT", Map()
)

// prepare data
val sourceEnconder = RowEncoder.apply(sourceSchema).resolveAndBind()
val newIter = Seq(Row.fromSeq(Seq("a1")), Row.fromSeq(Seq("a2"))).map { irow =>
sourceEnconder.toRow(irow).copy()
}.iterator

// run the code and get the return result
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, batch)
val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)

//f.copy(), copy function is required 
columnarBatchIter.flatMap { batch =>
  batch.rowIterator.asScala
}.foreach(f => println(f.copy()))
javaConext.markComplete
javaConext.close

Using python code snippet to process data in Spark

val enconder = RowEncoder.apply(struct).resolveAndBind() val envs = new util.HashMap[String, String]() envs.put(str(PythonConf.PYTHON_ENV), "source activate streamingpro-spark-2.4.x") val batch = new ArrowPythonRunner( Seq(ChainedPythonFunctions(Seq(PythonFunction( """ |import pandas as pd |import numpy as np |for item in data_manager.fetch_once(): | print(item) |df = pd.DataFrame({'AAA': [4, 5, 6, 7],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]}) |data_manager.set_output([[df['AAA'],df['BBB']]]) """.stripMargin, envs, "python", "3.6")))), struct, timezoneid, Map() ) val newIter = iter.map { irow => enconder.toRow(irow) } val commonTaskContext = new SparkContextImp(TaskContext.get(), batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) columnarBatchIter.flatMap { batch => batch.rowIterator.asScala.map(_.copy) } } val wow = SparkUtils.internalCreateDataFrame(session, abc, StructType(Seq(StructField("AAA", LongType), StructField("BBB", LongType))), false) wow.show() ">
val session = spark
import session.implicits._
val timezoneid = session.sessionState.conf.sessionLocalTimeZone
val df = session.createDataset[String](Seq("a1", "b1")).toDF("value")
val struct = df.schema
val abc = df.rdd.mapPartitions { iter =>
  val enconder = RowEncoder.apply(struct).resolveAndBind()
  val envs = new util.HashMap[String, String]()
  envs.put(str(PythonConf.PYTHON_ENV), "source activate streamingpro-spark-2.4.x")
  val batch = new ArrowPythonRunner(
    Seq(ChainedPythonFunctions(Seq(PythonFunction(
      """
        |import pandas as pd
        |import numpy as np
        |for item in data_manager.fetch_once():
        |    print(item)
        |df = pd.DataFrame({'AAA': [4, 5, 6, 7],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
        |data_manager.set_output([[df['AAA'],df['BBB']]])
      """.stripMargin, envs, "python", "3.6")))), struct,
    timezoneid, Map()
  )
  val newIter = iter.map { irow =>
    enconder.toRow(irow)
  }
  val commonTaskContext = new SparkContextImp(TaskContext.get(), batch)
  val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)
  columnarBatchIter.flatMap { batch =>
    batch.rowIterator.asScala.map(_.copy)
  }
}

val wow = SparkUtils.internalCreateDataFrame(session, abc, StructType(Seq(StructField("AAA", LongType), StructField("BBB", LongType))), false)
wow.show()

Run Python Project

With Pyjava, you can tell the system where is the python project and which is then entrypoint, then you can run this project in Java/Scala.

"/tmp/data", "tempModelLocalPath" -> "/tmp/model" )) output.foreach(println) ">
import tech.mlsql.arrow.python.runner.PythonProjectRunner

val runner = new PythonProjectRunner("./pyjava/examples/pyproject1", Map())
val output = runner.run(Seq("bash", "-c", "source activate dev && python train.py"), Map(
  "tempDataLocalPath" -> "/tmp/data",
  "tempModelLocalPath" -> "/tmp/model"
))
output.foreach(println)

Example In MLSQL

None Interactive Mode:

!python env "PYTHON_ENV=source activate streamingpro-spark-2.4.x";
!python conf "schema=st(field(a,long),field(b,long))";

select 1 as a as table1;

!python on table1 '''

import pandas as pd
import numpy as np
for item in data_manager.fetch_once():
    print(item)
df = pd.DataFrame({'AAA': [4, 5, 6, 8],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
data_manager.set_output([[df['AAA'],df['BBB']]])

''' named mlsql_temp_table2;

select * from mlsql_temp_table2 as output; 

Interactive Mode:

!python start;

!python env "PYTHON_ENV=source activate streamingpro-spark-2.4.x";
!python env "schema=st(field(a,integer),field(b,integer))";


!python '''
import pandas as pd
import numpy as np
''';

!python  '''
for item in data_manager.fetch_once():
    print(item)
df = pd.DataFrame({'AAA': [4, 5, 6, 8],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
data_manager.set_output([[df['AAA'],df['BBB']]])
''';
!python close;

Using PyJava as Arrow Server/Client

Java Server side:

enconder.toRow(irow) }.iterator val javaConext = new JavaContext val commonTaskContext = new AppContextImpl(javaConext, null) val Array(_, host, port) = socketRunner.serveToStreamWithArrow(newIter, dataSchema, 10, commonTaskContext) println(s"${host}:${port}") Thread.currentThread().join() ">
val socketRunner = new SparkSocketRunner("wow", NetUtils.getHost, "Asia/Harbin")

val dataSchema = StructType(Seq(StructField("value", StringType)))
val enconder = RowEncoder.apply(dataSchema).resolveAndBind()
val newIter = Seq(Row.fromSeq(Seq("a1")), Row.fromSeq(Seq("a2"))).map { irow =>
  enconder.toRow(irow)
}.iterator
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, null)

val Array(_, host, port) = socketRunner.serveToStreamWithArrow(newIter, dataSchema, 10, commonTaskContext)
println(s"${host}:${port}")
Thread.currentThread().join()

Python Client side:

import os
import socket

from pyjava.serializers import \
    ArrowStreamPandasSerializer

out_ser = ArrowStreamPandasSerializer(None, True, True)

out_ser = ArrowStreamPandasSerializer("Asia/Harbin", False, None)
HOST = ""
PORT = -1
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
    infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)
    outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
    kk = out_ser.load_stream(infile)
    for item in kk:
        print(item)

Python Server side:

import os

import pandas as pd

os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
from pyjava.api.serve import OnceServer

ddata = pd.DataFrame(data=[[1, 2, 3, 4], [2, 3, 4, 5]])

server = OnceServer("127.0.0.1", 11111, "Asia/Harbin")
server.bind()
server.serve([{'id': 9, 'label': 1}])

Java Client side:

println(enconder.fromRow(i.copy()))) javaConext.close ">
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import tech.mlsql.arrow.python.iapp.{AppContextImpl, JavaContext}
import tech.mlsql.arrow.python.runner.SparkSocketRunner
import tech.mlsql.common.utils.network.NetUtils

val enconder = RowEncoder.apply(StructType(Seq(StructField("a", LongType),StructField("b", LongType)))).resolveAndBind()
val socketRunner = new SparkSocketRunner("wow", NetUtils.getHost, "Asia/Harbin")
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, null)
val iter = socketRunner.readFromStreamWithArrow("127.0.0.1", 11111, commonTaskContext)
iter.foreach(i => println(enconder.fromRow(i.copy())))
javaConext.close

How to configure python worker runs in Docker (todo)

Owner
Byzer
Let data speak.
Byzer
A tool to help plan vacations with friends and family

Vacationer In Development A tool to help plan vacations with friends and family Deployment Requirements: NPM Docker Docker-Compose Deployment Instruct

JK 2 Oct 05, 2021
A python script to search for k-uniform Euclidean tilings.

k-uniform-solver A python script to search for k-uniform Euclidean tilings. This project's aim is to replicate and extend the list of k-uniform Euclid

3 Dec 06, 2022
nbsafety adds a layer of protection to computational notebooks by solving the stale dependency problem when executing cells out-of-order

nbsafety adds a layer of protection to computational notebooks by solving the stale dependency problem when executing cells out-of-order

150 Jan 07, 2023
An interactive course to git

OperatorEquals' Sandbox Git Course! Preface This Git course is an ongoing project containing use cases that I've met (and still meet) while working in

John Torakis 62 Sep 19, 2022
Test reproducibility of leiden/umap on different systems

Demonstrate that UMAP and Leiden analysis is not reproducible between different cpu architectures.

Gregor Sturm 2 Oct 16, 2021
If Google News had a Python library

pygooglenews If Google News had a Python library Created by Artem from newscatcherapi.com but you do not need anything from us or from anyone else to

Artem Bugara 1.1k Jan 08, 2023
This is a Poetry plugin that will make it possible to build projects using custom TOML files

Poetry Multiproject Plugin This is a Poetry plugin that will make it possible to build projects using custom TOML files. This is especially useful whe

David Vujic 69 Dec 25, 2022
SysCFG R/W Utility written in Swift

MagicCFG SysCFG R/W Utility written in Swift MagicCFG is one of our first, successful applications that we launched last year. The app makes it possib

Jan Fabel 82 Aug 08, 2022
Would upload anything I do with/related to brainfuck

My Brainfu*k Repo Basically wanted to create something with Brainfu*k but realized that with the smol brain I have, I need to see the cell values real

Rafeed 1 Mar 22, 2022
A basic ticketing software.

Ticketer A basic ticketing software. Screenshots Program Launched Issuing Ticket Show your Ticket Entry Done Program Exited Code Features to implement

Samyak Jain 2 Feb 10, 2022
An example module hooking system, will be used in PySAMP.

An example module hooking system, will be used in PySAMP.

2 May 01, 2022
A not exist cat image generator python package

A not exist cat image generator python package

Fayas Noushad 2 Dec 03, 2021
《赛马娘》(ウマ娘: Pretty Derby)辅助 🐎🖥 基于 auto-derby 可视化操作/设置 启动器 一键包

ok-derby 《赛马娘》(ウマ娘: Pretty Derby)辅助 🐎 🖥 基于 auto-derby 可视化操作/设置 启动器 一键包 便捷,好用的 auto_derby 管理器! 功能 支持客户端 DMM (前台) 实验性 安卓 ADB 连接(后台)开发基于 1080x1920 分辨率

秋葉あんず 90 Jan 01, 2023
Tie together `drf-spectacular` and `djangorestframework-dataclasses` for easy-to-use apis and openapi schemas.

Speccify Tie together drf-spectacular and djangorestframework-dataclasses for easy-to-use apis and openapi schemas. Usage @dataclass class MyQ

Lyst 4 Sep 26, 2022
Edorado93 - Unraveling a Rockstar! -- Too much? Fine, Unraveling a humble programmer then?

Hi, I'm Sachin Malhotra ( ⛄ 💻 🎃 🍺 ) Let me set the records straight. Roger Federer is the GOAT and I will not hear otherwise! Now that we have that

Sachin Malhotra 7 Dec 25, 2022
Blender addons - A collection of Blender tools I've written for myself over the years.

gret A collection of Blender tools I've written for myself over the years. I use these daily so they should be bug-free, mostly. Feel free to take and

217 Jan 08, 2023
Never see escaped bytes in output.

Uniout It makes Python print the object representation in readable chars instead of the escaped string. Example from pprint import pprint lang

Mosky Liu 156 Oct 21, 2022
Simple kivy project to help new kivy users build android apps with python.

Kivy Calculator A Simple Calculator made with kivy framework.Works on all platforms from Windows/linux to android. Description Simple kivy project to

Oussama Ben Sassi 6 Oct 06, 2022
A function decorator for enforcing function signatures

A function decorator for enforcing function signatures

Emmanuel I. Obi 0 Dec 08, 2021
Grammar of Scalable Linked Interactive Nucleotide Graphics

Gosling.js Gosling.js is a declarative grammar for interactive (epi)genomics visualization on the Web. ⚠️ Please be aware that the grammar of Gosling.

Gosling 126 Nov 29, 2022