## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#"""A collections of builtin avro functions"""fromtypingimportDict,Optional,TYPE_CHECKING,castfrompyspark.errorsimportPySparkTypeErrorfrompyspark.sql.columnimportColumnfrompyspark.sql.utilsimportget_active_spark_context,try_remote_avro_functionsfrompyspark.utilimport_print_missing_jarifTYPE_CHECKING:frompyspark.sql._typingimportColumnOrName
[docs]@try_remote_avro_functionsdeffrom_avro(data:"ColumnOrName",jsonFormatSchema:str,options:Optional[Dict[str,str]]=None)->Column:""" Converts a binary column of Avro format into its corresponding catalyst value. The specified schema must match the read data, otherwise the behavior is undefined: it may fail or return arbitrary result. To deserialize the data with a compatible and evolved schema, the expected Avro schema can be set via the option avroSchema. .. versionadded:: 3.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Parameters ---------- data : :class:`~pyspark.sql.Column` or str the binary column. jsonFormatSchema : str the avro schema in JSON string format. options : dict, optional options to control how the Avro record is parsed. Notes ----- Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide". Examples -------- >>> from pyspark.sql import Row >>> from pyspark.sql.avro.functions import from_avro, to_avro >>> data = [(1, Row(age=2, name='Alice'))] >>> df = spark.createDataFrame(data, ("key", "value")) >>> avroDf = df.select(to_avro(df.value).alias("avro")) >>> avroDf.collect() [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", ... "fields":[{"name":"age","type":["long","null"]}, ... {"name":"name","type":["string","null"]}]},"null"]}]}''' >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() [Row(value=Row(avro=Row(age=2, name='Alice')))] """frompy4j.java_gatewayimportJVMViewfrompyspark.sql.classic.columnimport_to_java_columnifnotisinstance(data,(Column,str)):raisePySparkTypeError(error_class="INVALID_TYPE",message_parameters={"arg_name":"data","arg_type":"pyspark.sql.Column or str",},)ifnotisinstance(jsonFormatSchema,str):raisePySparkTypeError(error_class="INVALID_TYPE",message_parameters={"arg_name":"jsonFormatSchema","arg_type":"str"},)ifoptionsisnotNoneandnotisinstance(options,dict):raisePySparkTypeError(error_class="INVALID_TYPE",message_parameters={"arg_name":"options","arg_type":"dict, optional"},)sc=get_active_spark_context()try:jc=cast(JVMView,sc._jvm).org.apache.spark.sql.avro.functions.from_avro(_to_java_column(data),jsonFormatSchema,optionsor{})exceptTypeErrorase:ifstr(e)=="'JavaPackage' object is not callable":_print_missing_jar("Avro","avro","avro",sc.version)raisereturnColumn(jc)
[docs]@try_remote_avro_functionsdefto_avro(data:"ColumnOrName",jsonFormatSchema:str="")->Column:""" Converts a column into binary of avro format. .. versionadded:: 3.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Parameters ---------- data : :class:`~pyspark.sql.Column` or str the data column. jsonFormatSchema : str, optional user-specified output avro schema in JSON string format. Notes ----- Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide". Examples -------- >>> from pyspark.sql import Row >>> from pyspark.sql.avro.functions import to_avro >>> data = ['SPADES'] >>> df = spark.createDataFrame(data, "string") >>> df.select(to_avro(df.value).alias("suite")).collect() [Row(suite=bytearray(b'\\x00\\x0cSPADES'))] >>> jsonFormatSchema = '''["null", {"type": "enum", "name": "value", ... "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]}]''' >>> df.select(to_avro(df.value, jsonFormatSchema).alias("suite")).collect() [Row(suite=bytearray(b'\\x02\\x00'))] """frompy4j.java_gatewayimportJVMViewfrompyspark.sql.classic.columnimport_to_java_columnifnotisinstance(data,(Column,str)):raisePySparkTypeError(error_class="INVALID_TYPE",message_parameters={"arg_name":"data","arg_type":"pyspark.sql.Column or str",},)ifnotisinstance(jsonFormatSchema,str):raisePySparkTypeError(error_class="INVALID_TYPE",message_parameters={"arg_name":"jsonFormatSchema","arg_type":"str"},)sc=get_active_spark_context()try:ifjsonFormatSchema=="":jc=cast(JVMView,sc._jvm).org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))else:jc=cast(JVMView,sc._jvm).org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data),jsonFormatSchema)exceptTypeErrorase:ifstr(e)=="'JavaPackage' object is not callable":_print_missing_jar("Avro","avro","avro",sc.version)raisereturnColumn(jc)
def_test()->None:importosimportsysfrompyspark.testing.utilsimportsearch_jaravro_jar=search_jar("connector/avro","spark-avro","spark-avro")ifavro_jarisNone:print("Skipping all Avro Python tests as the optional Avro project was ""not compiled into a JAR. To run these tests, ""you need to build Spark with 'build/sbt -Pavro package' or ""'build/mvn -Pavro package' before running this test.")sys.exit(0)else:existing_args=os.environ.get("PYSPARK_SUBMIT_ARGS","pyspark-shell")jars_args="--jars %s"%avro_jaros.environ["PYSPARK_SUBMIT_ARGS"]=" ".join([jars_args,existing_args])importdoctestfrompyspark.sqlimportSparkSessionimportpyspark.sql.avro.functionsglobs=pyspark.sql.avro.functions.__dict__.copy()spark=(SparkSession.builder.master("local[4]").appName("sql.avro.functions tests").getOrCreate())globs["spark"]=spark(failure_count,test_count)=doctest.testmod(pyspark.sql.avro.functions,globs=globs,optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE,)spark.stop()iffailure_count:sys.exit(-1)if__name__=="__main__":_test()