Efficient training of deep recommenders on cloud.

Overview

HybridBackend

Tensorflow 1.15 CPU CI Build Badge Documentation Status

Introduction

HybridBackend is a training framework for deep recommenders which bridges the gap between evolving cloud infrastructure and complex training process. See documentation for more information.

bridging

Installation

Install latest CPU version:

pip install hybridbackend-cpu  # tensorflow/tensorflow:1.15.5-py3

Contributing

We appreciate all contributions to improve HybridBackend. Please follow below steps to contribute:

1. Clone the repository and checkout a new branch.

git clone <git_repo_addr>
git pull -r
git checkout -b features/my_feature

2. Commit changes, check code style and test.

git commit
cibuild/run cibuild/format
cibuild/run cibuild/lint
cibuild/run make -j8
cibuild/run make test

3. Create pull request for code review.

Comments
  • Using shuffle or rebatch may cause OOM problem

    Using shuffle or rebatch may cause OOM problem

    1. Current behavior

    Using shuffle or rebatch may cause OOM problem.

    1.1 小文件测试记录

    total parquet file count: 15780

    total parquet file size: 126G

    total sample count: 600w

    No. | Test Scenarios | RAM Usage -- | -- | -- 1 | no shuffle no rebatch | use 50G ram stable 2 | shuffle,buffer_size=2048 rebatch,batch_size=8192 | start at 53G ram, and increasingly use more ram rapidly, exceed 94G ram limit within 2 minutes 3 | shuffle,buffer_size=8 rebatch,batch_size=8192 | increasingly use more ram slowly, exceed 94G ram limit after 2 hours 4 | no shuffle rebatch,batch_size=8192 | increasingly use more ram slowly, exceed 94G ram limit after 2 hours 5 | shuffle,buffer_size=8 no rebatch | use 50G ram stable at first, but increasingly use more ram after 1 hour

    image-20220303211253747

    image-20220303211236154

    image-20220303211125580

    image-20220303211314882

    image-20220303211215059

    1.2 大文件测试记录

    total parquet file count: 240

    total parquet file size: 126G

    单个parquet文件大小:500MB

    total sample count: 600w

    微信图片_20220323124551

    微信图片_20220323124557

    现象:每个epoch训练结束时,有明显内存回收的过程,但是回收不干净,导致每个epoch后使用的内存峰值越来越多,最终OOM。但是如果1、2个epoch内能训练完,内存不会爆。

    1.3 不同训练方式测试对比

    | | 新sdk | 旧sdk | | ------------ | ------------- | ------------------------------------------------------------ | | 是否内存溢出 | 是 | 否 | | 环境 | T4 单机单卡 | T4 单机单卡 | | 训练方式 | session.run() | tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) | | 调用链 | 见下图1 | 见下图2 |

    微信图片_20220323124545

    微信图片_20220323124542

    2. Expected behavior

    During the training process, tensorflow should use stable amount of RAM, not using more and more RAM.

    3. System information

    • OS Platform and Distribution: Ubuntu 18.04.5 LTS
    • TensorFlow version: 1.15.0
    • Python version: 3.6
    • CUDA/cuDNN version: 10.1
    • RAM: 94G
    • GPU model and memory: Tesla T4, 16G

    4. Code to reproduce

    BATCH_SIZE = 8192
    
    parquet_file_list = ['some_parquet_file1.snappy.parquet', 'some_parquet_file2.snappy.parquet', ...]
    filenames_ds = tf.data.TFRecordDataset.from_tensor_slices(file_list)
    hb_fields = []
    hb_fields.append(hb.data.DataFrame.Field('int_field', tf.int64, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field('float_field', tf.float32, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field('array_field', tf.float32, ragged_rank=1))   # ... and some anthor fields
    
    
    # 1. no shuffle, no rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    
    # 2. big shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(2048)
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 3. small shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(8)
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 4. no shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 5. small shuffle and no rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(8)
    

    Willing to contribute

    Yes

    opened by liurcme 5
  • Question: Data Loading Performance with 150G Byte/s

    Question: Data Loading Performance with 150G Byte/s

    Hi, thanks for open source this project,it's a great job!🥂 🍻

    I saw the Data Loading doc here,the ParquetDataset is to solve IO performance issues on cloud.

    According to the doc, the speed of reading and decoding ParquetDataset is about 150G Byte/s (3346.10MB/21.67ms) , equals max throughput of 12X 100G bit/s NIC, it's nearly impossible on cloud(hdfs/oss/s3).

    File Format | Size (MB) | Framework | #Threads | Elapsed (ms) ----------- | --------- | ------------- | -------- | ------------ CSV | 11062.61 | Tensorflow | 1 | 8558.38 Parquet | 3346.10 | Tensorflow IO | 1 | 103056.71 Parquet | 3346.10 | HybridBackend | 1 | 397.88 Parquet | 3346.10 | HybridBackend | 20 | 21.67

    Is it convenient to provide details of test environment? Apart from code of Dataset module, will HybridBackend engine code be released in the future?

    Thanks 🥂 🍻

    opened by neuzxy 4
  • Feature Request: Supports prefetching data to GPU

    Feature Request: Supports prefetching data to GPU

    User Story

    As a recommender system engineer, I want to read large batch of tabular data on GPU efficiently, so that training performance of large deep recommenders can be improved.

    Detailed requirements

    • It should be easy to use with TensorFlow Dataset API

    API Compatibility

    • Only new APIs should be introduced.

    Willing to contribute

    Yes

    enhancement 
    opened by 2sin18 3
  • What version of snappy should I install for building HB from source?

    What version of snappy should I install for building HB from source?

    Summary

    I am trying to build HB from source, when i use the make -j8 command from the work dir, i get the following error :

    (base) [email protected]:/HybridBackend# make -j8
    mkdir -p /root/projects/tmp/HybridBackend/arrow/build/
    ARROW_INSTALL=/root/projects/tmp/HybridBackend/arrow/dist \
    ARROW_BUILD=/root/projects/tmp/HybridBackend/arrow/build \
    ARROW_OSX_TARGET= \
    USE_CXX11_ABI=0 \
    WITH_ARROW_HDFS=ON \
    WITH_ARROW_S3=ON \
    SIMD_LEVEL=AVX2 \
    OS=Linux \
    bash arrow/build.sh
    -- Building using CMake version: 3.16.3
    -- Arrow version: 5.0.0 (full: '5.0.0')
    -- Arrow SO version: 500 (full: 500.0.0)
    -- clang-tidy not found
    -- clang-format not found
    -- Could NOT find ClangTools (missing: CLANG_FORMAT_BIN CLANG_TIDY_BIN)
    -- infer not found
    -- Found cpplint executable at /root/projects/tmp/HybridBackend/arrow/src/cpp/build-support/cpplint.py
    -- System processor: x86_64
    -- Arrow build warning level: PRODUCTION
    Using ld linker
    Configured for RELEASE build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})
    -- Build Type: RELEASE
    -- Using CONDA approach to find dependencies
    -- Using CONDA_PREFIX for ARROW_PACKAGE_PREFIX: /root/miniconda3
    -- Setting (unset) dependency *_ROOT variables: /root/miniconda3
    -- ARROW_ABSL_BUILD_VERSION: 0f3bb466b868b523cf1dc9b2aaaed65c77b28862
    -- ARROW_AWSSDK_BUILD_VERSION: 1.8.133
    -- ARROW_AWS_CHECKSUMS_BUILD_VERSION: v0.1.10
    -- ARROW_AWS_C_COMMON_BUILD_VERSION: v0.5.10
    -- ARROW_AWS_C_EVENT_STREAM_BUILD_VERSION: v0.1.5
    -- ARROW_BOOST_BUILD_VERSION: 1.75.0
    -- ARROW_BROTLI_BUILD_VERSION: v1.0.9
    -- ARROW_BZIP2_BUILD_VERSION: 1.0.8
    -- ARROW_CARES_BUILD_VERSION: 1.17.1
    -- ARROW_GBENCHMARK_BUILD_VERSION: v1.5.2
    -- ARROW_GFLAGS_BUILD_VERSION: v2.2.2
    -- ARROW_GLOG_BUILD_VERSION: v0.4.0
    -- ARROW_GRPC_BUILD_VERSION: v1.35.0
    -- ARROW_GTEST_BUILD_VERSION: 1.10.0
    -- ARROW_JEMALLOC_BUILD_VERSION: 5.2.1
    -- ARROW_LZ4_BUILD_VERSION: v1.9.3
    -- ARROW_MIMALLOC_BUILD_VERSION: v1.7.2
    -- ARROW_ORC_BUILD_VERSION: 1.6.6
    -- ARROW_PROTOBUF_BUILD_VERSION: v3.14.0
    -- ARROW_RAPIDJSON_BUILD_VERSION: 1a803826f1197b5e30703afe4b9c0e7dd48074f5
    -- ARROW_RE2_BUILD_VERSION: 2021-02-02
    -- ARROW_SNAPPY_BUILD_VERSION: 1.1.8
    -- ARROW_THRIFT_BUILD_VERSION: 0.13.0
    -- ARROW_THRIFT_BUILD_MD5_CHECKSUM: 38a27d391a2b03214b444cb13d5664f1
    -- ARROW_UTF8PROC_BUILD_VERSION: v2.6.1
    -- ARROW_XSIMD_BUILD_VERSION: e9234cd6e6f4428fc260073b2c34ffe86fda1f34
    -- ARROW_ZLIB_BUILD_VERSION: 1.2.11
    -- ARROW_ZSTD_BUILD_VERSION: v1.5.0
    -- Boost include dir: /usr/include
    -- Boost libraries: Boost::system;Boost::filesystem
    CMake Error at /usr/share/cmake-3.16/Modules/FindPackageHandleStandardArgs.cmake:146 (message):
      Could NOT find Snappy (missing: Snappy_LIB Snappy_INCLUDE_DIR)
    Call Stack (most recent call first):
      /usr/share/cmake-3.16/Modules/FindPackageHandleStandardArgs.cmake:393 (_FPHSA_FAILURE_MESSAGE)
      cmake_modules/FindSnappy.cmake:55 (find_package_handle_standard_args)
      cmake_modules/ThirdpartyToolchain.cmake:235 (find_package)
      cmake_modules/ThirdpartyToolchain.cmake:948 (resolve_dependency)
      CMakeLists.txt:515 (include)
    
    
    -- Configuring incomplete, errors occurred!
    See also "/root/projects/tmp/HybridBackend/arrow/build/CMakeFiles/CMakeOutput.log".
    See also "/root/projects/tmp/HybridBackend/arrow/build/CMakeFiles/CMakeError.log".
    make: *** [arrow/Makefile:8: /root/projects/tmp/HybridBackend/arrow/build/install_manifest.txt] Error 1
    
    

    I have install libsnappy-dev and can found it from /usr/local/include/snappy.h and /usr/local/lib/libsnappy.a, but that error still exists, so how should I install the correct snappy version for building HB?

    I also tryed the docker images from registry.cn-shanghai.aliyuncs.com/pai-dlc/hybridbackend:developer-tensorflow1.15-manylinux_2_27-py3.6-cu114, same error exists.

    Installation environment

    • GPU model and memory:
    • OS Platform: "20.04.3 LTS (Focal Fossa)"
    • Docker version:
    • GCC/CUDA/cuDNN version: 11.4
    • Python/conda version: Python 3.8.10/ conda 4.10.3
    • TensorFlow/PyTorch version: 1.15.5+deeprec2201

    Willing to contribute

    Yes

    opened by fuhailin 3
  • to_sparse failed for Value with ragged_rank > 1 read from parquet file

    to_sparse failed for Value with ragged_rank > 1 read from parquet file

    Current behavior

    when hb read some nested lists with ragged_rank > 1,the read Value cannot be transformed to SparseTensor by function hb.data.to_sparse.

    For example: dense_feature is one of the features read by hb.data.ParquetDataset, and to_sparse does not work for it. image

    Moreover, if I swap the order of the two nested_row_splits, then it can be to_sparse.

    image

    So maybe the order of the nested_row_splits when reading parquet file is incorrect?

    Expected behavior

    the Value read from parquet file can be transformed to SparseTensor.

    System information

    • GPU model and memory: No
    • OS Platform: Ubuntu
    • Docker version: No
    • GCC/CUDA/cuDNN version: 7.4/No/No
    • Python/conda version:3.6.13/4.13.0
    • TensorFlow/PyTorch version:1.14.0

    Code to reproduce

    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    dataset = hb.data.ParquetDataset("test2.zstd.parquet", batch_size=1)
    dataset = dataset.apply(hb.data.to_sparse())
    iterator = dataset.make_one_shot_iterator()
    next_element = iterator.get_next()
    sess = tf.Session()
    vals = sess.run(next_element)
    
    # One more simple demo:
    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    val = hb.data.dataframe.DataFrame.Value(values = np.array([1,2,3,4,5]), nested_row_splits=(np.array([0,1,3,4,5]), np.array([0,2,4])))
    sess = tf.Session()
    sess.run(val.to_sparse())
    

    Willing to contribute

    Yes

    bug 
    opened by SamJia 2
  • rebatch api produce an Check failed: limit <= dim0_size error

    rebatch api produce an Check failed: limit <= dim0_size error

    Current behavior

    After rebatch(), data iterator get_next() produce an error:

    F tensorflow/core/framework/tensor.cc:833] Check failed: limit <= dim0_size (8194 vs. 8193)
    

    Expected behavior

    no error

    System information

    • OS Platform and Distribution: Ubuntu 18.04.5 LTS
    • TensorFlow version: 1.15.0
    • Python version: 3.6
    • CUDA/cuDNN version: 10.1
    • RAM: 94G
    • GPU model and memory: Tesla T4, 16G

    Code to reproduce

    Step 1: Generate a parquet file by running following code

    import numpy as np
    import pandas as pd
    import random
    
    data_list = []
    for i in range(1, 10000):
        int_feature = random.randint(1, 100)
        # float_feature = random.random()
        array_feature = [random.randint(1, 10) for x in range(0, 4)]
        data_list.append([int_feature, array_feature])
    
    df = pd.DataFrame(data_list, columns=["int_feature", "array_feature"])
    df.to_parquet("parquet_sample_file.parquet")
    

    Step 2: Load generated parquet in step 1 by HybridBackend

    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    
    
    filenames_ds = tf.data.Dataset.from_tensor_slices(['file1.snappy.parquet', 'file2.snappy.parquet', ... 'fileN.snappy.parquet'])
    
    
    hb_fields = []
    hb_fields.append(hb.data.DataFrame.Field("feature1", tf.int64, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field("feature2", tf.float32, ragged_rank=1))
    hb_fields.append(hb.data.DataFrame.Field("feature3", tf.int64, ragged_rank=1))
    
    ds = filenames_ds.apply(hb.data.read_parquet(8192, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    iterator = ds.apply(hb.data.rebatch(8192, fields=hb_fields))
    
    it = iterator.make_one_shot_iterator()
    item = it.get_next()
    
    batch_size_dict = {}
    with tf.Session() as sess:
        print("======  start ======")
        total_batch_size = 0
        while True:
            try:
                batch = sess.run(item)
                batch_size = len(batch['mod_series'])
                batch_size_dict[batch_size] = batch_size_dict.get(batch_size, 0) + 1
            except tf.errors.OutOfRangeError:
                break
    
    

    Running above code in a pyhon3 shell, an error shall be thrown:

    F tensorflow/core/framework/tensor.cc:833] Check failed: limit <= dim0_size (8194 vs. 8193)
    

    Willing to contribute

    Yes

    bug 
    opened by liurcme 2
  • Bump tensorflow from 1.15.5 to 2.5.3 in /docs

    Bump tensorflow from 1.15.5 to 2.5.3 in /docs

    Bumps tensorflow from 1.15.5 to 2.5.3.

    Release notes

    Sourced from tensorflow's releases.

    TensorFlow 2.5.3

    Release 2.5.3

    Note: This is the last release in the 2.5 series.

    This releases introduces several vulnerability fixes:

    • Fixes a floating point division by 0 when executing convolution operators (CVE-2022-21725)
    • Fixes a heap OOB read in shape inference for ReverseSequence (CVE-2022-21728)
    • Fixes a heap OOB access in Dequantize (CVE-2022-21726)
    • Fixes an integer overflow in shape inference for Dequantize (CVE-2022-21727)
    • Fixes a heap OOB access in FractionalAvgPoolGrad (CVE-2022-21730)
    • Fixes an overflow and divide by zero in UnravelIndex (CVE-2022-21729)
    • Fixes a type confusion in shape inference for ConcatV2 (CVE-2022-21731)
    • Fixes an OOM in ThreadPoolHandle (CVE-2022-21732)
    • Fixes an OOM due to integer overflow in StringNGrams (CVE-2022-21733)
    • Fixes more issues caused by incomplete validation in boosted trees code (CVE-2021-41208)
    • Fixes an integer overflows in most sparse component-wise ops (CVE-2022-23567)
    • Fixes an integer overflows in AddManySparseToTensorsMap (CVE-2022-23568)
    • Fixes a number of CHECK-failures in MapStage (CVE-2022-21734)
    • Fixes a division by zero in FractionalMaxPool (CVE-2022-21735)
    • Fixes a number of CHECK-fails when building invalid/overflowing tensor shapes (CVE-2022-23569)
    • Fixes an undefined behavior in SparseTensorSliceDataset (CVE-2022-21736)
    • Fixes an assertion failure based denial of service via faulty bin count operations (CVE-2022-21737)
    • Fixes a reference binding to null pointer in QuantizedMaxPool (CVE-2022-21739)
    • Fixes an integer overflow leading to crash in SparseCountSparseOutput (CVE-2022-21738)
    • Fixes a heap overflow in SparseCountSparseOutput (CVE-2022-21740)
    • Fixes an FPE in BiasAndClamp in TFLite (CVE-2022-23557)
    • Fixes an FPE in depthwise convolutions in TFLite (CVE-2022-21741)
    • Fixes an integer overflow in TFLite array creation (CVE-2022-23558)
    • Fixes an integer overflow in TFLite (CVE-2022-23559)
    • Fixes a dangerous OOB write in TFLite (CVE-2022-23561)
    • Fixes a vulnerability leading to read and write outside of bounds in TFLite (CVE-2022-23560)
    • Fixes a set of vulnerabilities caused by using insecure temporary files (CVE-2022-23563)
    • Fixes an integer overflow in Range resulting in undefined behavior and OOM (CVE-2022-23562)
    • Fixes a vulnerability where missing validation causes tf.sparse.split to crash when axis is a tuple (CVE-2021-41206)
    • Fixes a CHECK-fail when decoding resource handles from proto (CVE-2022-23564)
    • Fixes a CHECK-fail with repeated AttrDef (CVE-2022-23565)
    • Fixes a heap OOB write in Grappler (CVE-2022-23566)
    • Fixes a CHECK-fail when decoding invalid tensors from proto (CVE-2022-23571)
    • Fixes an unitialized variable access in AssignOp (CVE-2022-23573)
    • Fixes an integer overflow in OpLevelCostEstimator::CalculateTensorSize (CVE-2022-23575)
    • Fixes an integer overflow in OpLevelCostEstimator::CalculateOutputSize (CVE-2022-23576)
    • Fixes a null dereference in GetInitOp (CVE-2022-23577)
    • Fixes a memory leak when a graph node is invalid (CVE-2022-23578)
    • Fixes an abort caused by allocating a vector that is too large (CVE-2022-23580)
    • Fixes multiple CHECK-failures during Grappler's IsSimplifiableReshape (CVE-2022-23581)
    • Fixes multiple CHECK-failures during Grappler's SafeToRemoveIdentity (CVE-2022-23579)
    • Fixes multiple CHECK-failures in TensorByteSize (CVE-2022-23582)
    • Fixes multiple CHECK-failures in binary ops due to type confusion (CVE-2022-23583)

    ... (truncated)

    Changelog

    Sourced from tensorflow's changelog.

    Release 2.5.3

    This releases introduces several vulnerability fixes:

    • Fixes a floating point division by 0 when executing convolution operators (CVE-2022-21725)
    • Fixes a heap OOB read in shape inference for ReverseSequence (CVE-2022-21728)
    • Fixes a heap OOB access in Dequantize (CVE-2022-21726)
    • Fixes an integer overflow in shape inference for Dequantize (CVE-2022-21727)
    • Fixes a heap OOB access in FractionalAvgPoolGrad (CVE-2022-21730)
    • Fixes an overflow and divide by zero in UnravelIndex (CVE-2022-21729)
    • Fixes a type confusion in shape inference for ConcatV2 (CVE-2022-21731)
    • Fixes an OOM in ThreadPoolHandle (CVE-2022-21732)
    • Fixes an OOM due to integer overflow in StringNGrams (CVE-2022-21733)
    • Fixes more issues caused by incomplete validation in boosted trees code (CVE-2021-41208)
    • Fixes an integer overflows in most sparse component-wise ops (CVE-2022-23567)
    • Fixes an integer overflows in AddManySparseToTensorsMap (CVE-2022-23568)
    • Fixes a number of CHECK-failures in MapStage (CVE-2022-21734)
    • Fixes a division by zero in FractionalMaxPool (CVE-2022-21735)
    • Fixes a number of CHECK-fails when building invalid/overflowing tensor shapes (CVE-2022-23569)
    • Fixes an undefined behavior in SparseTensorSliceDataset (CVE-2022-21736)
    • Fixes an assertion failure based denial of service via faulty bin count operations (CVE-2022-21737)
    • Fixes a reference binding to null pointer in QuantizedMaxPool (CVE-2022-21739)
    • Fixes an integer overflow leading to crash in SparseCountSparseOutput (CVE-2022-21738)
    • Fixes a heap overflow in SparseCountSparseOutput (CVE-2022-21740)
    • Fixes an FPE in BiasAndClamp in TFLite (CVE-2022-23557)
    • Fixes an FPE in depthwise convolutions in TFLite (CVE-2022-21741)

    ... (truncated)

    Commits
    • 959e9b2 Merge pull request #54213 from tensorflow/fix-sanity-on-r2.5
    • d05fcbc Fix sanity build
    • f2526a0 Merge pull request #54205 from tensorflow/disable-flaky-tests-on-r2.5
    • a5f94df Disable flaky test
    • 7babe52 Merge pull request #54201 from tensorflow/cherrypick-510ae18200d0a4fad797c0bf...
    • 0e5d378 Set Env Variable to override Setuptools new behavior
    • fdd4195 Merge pull request #54176 from tensorflow-jenkins/relnotes-2.5.3-6805
    • 4083165 Update RELEASE.md
    • a2bb7f1 Merge pull request #54185 from tensorflow/cherrypick-d437dec4d549fc30f9b85c75...
    • 5777ea3 Update third_party/icu/workspace.bzl
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
    • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
    • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
    • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

    You can disable automated security fix PRs for this repo from the Security Alerts page.

    dependencies 
    opened by dependabot[bot] 2
  • Question: When to release the code?

    Question: When to release the code?

    Hi, I saw the code is not public according to the architecture doc here https://hybridbackend.readthedocs.io/en/latest/architecture.html

    Do you have a plan to open source it? Or is it just focused on data io?

    I'd appreciate it if anyone could help me.

    Thanks :clinking_glasses: :beers:

    opened by gaocegege 2
  • tf.keras.layers.DenseFeatures  api as the candidate of hb.feature_column.DenseFeatures can not work with tf.feature_column.shared_embedding_columns

    tf.keras.layers.DenseFeatures api as the candidate of hb.feature_column.DenseFeatures can not work with tf.feature_column.shared_embedding_columns

    Current behavior

    HB version: HybridBackend 0.7.0-e277c15f3843f98901f0795bc9b7d0768056d5a3; tf1.15.5-v1.15.5+nv22.06-0-g55be3962f8; g++ 7.5.0; CUDA 11.4 (70,75,80,86) new hb package removes the hb.feature_column.DenseFeatures api. However, tf.keras.layers.DenseFeatures api can not deal with tf.feature_column.shared_embedding_columns.

    Expected behavior

    I want a new way to run the code successfully without hb.feature_column.DenseFeatures.

    System information

    • GPU model and memory:
    • OS Platform:
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version:
    • TensorFlow/PyTorch version:

    Code to reproduce

    Willing to contribute

    Yes

    opened by taoyun951753 1
  • hb.keras.Model's fit() func support dataset multiple labels

    hb.keras.Model's fit() func support dataset multiple labels

    User Story

    I want to train model using multiple labels data, but fit function throw Exception like Error when checking model target:expected no data....

    When the dataset only contain one label, It's OK.

    Detailed requirements

    • The dataset like this,the labels maybe tuple or dict:
    ds = hb.data.ParquetDataset(XXX)
    def map_fn(batch):
        labels = tuple([batch[l] for l in labels])
        features = {}
        #pass
        return features, labels
    ds = ds.map(map_fn)
    
    • The fit() like this:
    m.fit(
        x=train_ds,
        validation_data=valid_ds,
        #XXX
        verbose=0)
    

    I wish fit() can support dataset like above.

    API Compatibility

    hb.keras.Model's fit()

    Willing to contribute

    Yes

    opened by karterotte 1
  • support keras fit history in estimator's train_and_evaluate

    support keras fit history in estimator's train_and_evaluate

    User Story

    I want to hold a record of the loss values and metric values during training, like keras History object: https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/History https://keras.io/guides/training_with_built_in_methods/ image

    Detailed requirements

    I have to decide saving or not models depends on it's metrics(maybe lastest one).

    API Compatibility

    hb.estimator.train_and_evaluate

    Willing to contribute

    Yes

    enhancement 
    opened by karterotte 1
  • How to realize gradient truncation function in HB Pkg

    How to realize gradient truncation function in HB Pkg

    Current behavior

    Recently, when I trained the model, the loss function got a Nan value. I want to perform gradient truncation. However HB can only use tf.train.XxxOptimizer api, which is used with computes_ gradients, tf.clip_ by_ value(), tf.clip_ by_ Norm() . But now HB uses tf.keras compile, fit mode. The community tf.keras.optimizers Adam () supports the clipvalue/clipnorm parameter .

    Expected behavior

    I want the HB pkg can support the gradient truncation function.

    System information

    • GPU model and memory:
    • OS Platform:
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version:
    • TensorFlow/PyTorch version:

    Code to reproduce

    Willing to contribute

    Yes

    opened by taoyun951753 0
  • feature_column bucket_size is 6, use 8 gpus,  then worker-5 and worker-6 'save/RestoreV2' failed

    feature_column bucket_size is 6, use 8 gpus, then worker-5 and worker-6 'save/RestoreV2' failed

    feature_column bucket_size is 6, use 8 gpus, then worker-5 and worker-6 'save/RestoreV2' failed; backtrace: Traceback (most recent call last): File "neg_feedback_multi.py", line 1252, in tf.app.run() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/platform/app.py", line 40, in run _run(main=main, argv=argv, flags_parser=_parse_flags_tolerate_undef) File "/home/pai/lib/python3.6/site-packages/absl/app.py", line 308, in run _run_main(main, args) File "/home/pai/lib/python3.6/site-packages/absl/app.py", line 254, in _run_main sys.exit(main(argv)) File "neg_feedback_multi.py", line 1235, in main model.run() File "neg_feedback_multi.py", line 1227, in run classifier.train_and_evaluate(train_spec, eval_spec) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/estimator/estimator.py", line 276, in train_and_evaluate return executor.run() File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/training.py", line 640, in run getattr(self, task_to_run)() File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/training.py", line 650, in run_worker return self._start_distributed_training() File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/training.py", line 796, in _start_distributed_training saving_listeners=saving_listeners) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/estimator/estimator.py", line 188, in train saving_listeners=saving_listeners) File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 370, in train loss = self._train_model(input_fn, hooks, saving_listeners) File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1161, in _train_model return self._train_model_default(input_fn, hooks, saving_listeners) File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1195, in _train_model_default saving_listeners) File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1490, in _train_with_estimator_spec log_step_count_steps=log_step_count_steps) as mon_sess: File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/training/session.py", line 131, in HybridBackendMonitoredTrainingSession sess = fn(*args, **kwargs) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 678, in MonitoredTrainingSession stop_grace_period_secs=stop_grace_period_secs) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/training/session.py", line 64, in init session_creator, hooks, should_recover=True, **kwargs) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 827, in init self._sess = _RecoverableSession(self._coordinated_creator) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 1309, in init _WrappedSession.init(self, self._create_session()) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 1314, in _create_session return self._sess_creator.create_session() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 980, in create_session self.tf_sess = self._session_creator.create_session() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 733, in create_session self._scaffold.finalize() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 252, in finalize self._saver.build() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 1059, in build self._build(self._filename, build_save=True, build_restore=True) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/training/saver.py", line 258, in _build super()._build(*args, **kwargs) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 1137, in _build build_restore=build_restore) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 660, in _build_internal restore_sequentially, reshape) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/training/saver.py", line 200, in _AddShardedRestoreOps filename_tensor, per_device, restore_sequentially, reshape) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 536, in _AddShardedRestoreOps name="restore_shard")) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 476, in _AddRestoreOps restore_sequentially) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 744, in bulk_restore return io_ops.restore_v2(filename_tensor, names, slices, dtypes) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/ops/gen_io_ops.py", line 2380, in restore_v2 name=name) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/op_def_library.py", line 794, in _apply_op_helper op_def=op_def) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/util/deprecation.py", line 507, in new_func return func(*args, **kwargs) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/ops.py", line 3360, in create_op attrs, op_def, compute_device) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/ops.py", line 3429, in _create_op_internal op_def=op_def) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/ops.py", line 1773, in init control_input_ops) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/ops.py", line 1613, in _create_c_op raise ValueError(str(e)) ValueError: Expected non-negative start and positive length but got start = 6, length = 0: string = 6,0:0,10 for 'save/RestoreV2' (op: 'RestoreV2') with input shapes: [], [382], [382] and with computed input tensors: input[2] = <144150 23 108114,18018:0,23 144150 23 108114,18018:0,23 195

    opened by zhbhhb 0
  • the EarlyStopping callback not working well on multi worker distribute training job

    the EarlyStopping callback not working well on multi worker distribute training job

    Current behavior

    If there is only one worker ,training with EarlyStopping callback is ok. When multi workers with EarlyStopping callback doing distribute training, all workers will be hanging and waiting for synchronizing.

    09D96DCB-F298-4941-8C85-CDB56A5C0ABB

    Expected behavior

    I want the EarlyStopping callback works well not only on one worker task but also on multi workers distribute training job.

    System information

    • GPU model and memory:
    • OS Platform:
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version:
    • TensorFlow/PyTorch version:

    Code to reproduce

    .... callbacks_list.append(EarlyStopping(monitor="val_loss", min_delta=self.ctx.min_delta, patience=self.ctx.patience, verbose=verbose, mode="min", baseline=None, restore_best_weights=True) )

    ....

    keras_model.fit( x=None, y=None, validation_data=valid_ds, steps_per_epoch=self.ctx.steps_per_epoch, validation_steps=self.ctx.valid_steps_per_epoch, epochs=self.ctx.callback_num, callbacks=callbacks_list, checkpoint_dir=self.ctx.model_save_path, keep_checkpoint_max=1, verbose=0)

    Willing to contribute

    Yes

    opened by taoyun951753 0
  • Dataset iterator can't be warpped in the hybridBackend scope

    Dataset iterator can't be warpped in the hybridBackend scope

    Current behavior

    I am using hybridBackend to do data parallelism, I create a dataset and make it an iterator, when I use hybridBackend scope to wrap the whole pipeline, an exception occurred after the iterator step, here is the error log:

    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 324, in _AssertCompatible
        fn(values)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 276, in _check_not_tensor
        _ = [_check_failed(v) for v in nest.flatten(values)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 277, in <listcomp>
        if isinstance(v, ops.Tensor)]
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 248, in _check_failed
        raise ValueError(v)
    ValueError: Tensor("Iterator_1/Identity:0", shape=(?,), dtype=int64, device=/job:chief/task:0/device:GPU:0)
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "demo.py", line 332, in <module>
        app.run(runner)
      File "/usr/local/lib/python3.6/dist-packages/absl/app.py", line 308, in run
        _run_main(main, args)
      File "/usr/local/lib/python3.6/dist-packages/absl/app.py", line 254, in _run_main
        sys.exit(main(argv))
      File "demo.py", line 213, in runner
        features, labels = datasource.iter.get_next()
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/data/iterators.py", line 120, in get_next
        DataSyncRewriting.accept(should_stop)
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/data/iterators.py", line 169, in accept
        should_stop = math_ops.cast(should_stop, dtypes.int32)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/util/dispatch.py", line 180, in wrapper
        return target(*args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/ops/math_ops.py", line 702, in cast
        x = ops.convert_to_tensor(x, name="x")
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/ops.py", line 1184, in convert_to_tensor
        return convert_to_tensor_v2(value, dtype, preferred_dtype, name)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/ops.py", line 1242, in convert_to_tensor_v2
        as_ref=False)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/ops.py", line 1297, in internal_convert_to_tensor
        ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/constant_op.py", line 286, in _constant_tensor_conversion_function
        return constant(v, dtype=dtype, name=name)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/constant_op.py", line 227, in constant
        allow_broadcast=True)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/constant_op.py", line 265, in _constant_impl
        allow_broadcast=allow_broadcast))
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 449, in make_tensor_proto
        _AssertCompatible(values, dtype)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 328, in _AssertCompatible
        raise TypeError("List of Tensors when single Tensor expected")
    TypeError: List of Tensors when single Tensor expected
    

    Expected behavior

    System information

    • GPU model and memory: Tesla P100
    • OS Platform: Ubuntu 18.04
    • Docker version: Docker Engine - Community Version: 20.10.14
    • GCC/CUDA/cuDNN version:
    • Python/conda version: Python 3.6.9
    • TensorFlow/PyTorch version: TensorFlow:DeepRec2208

    Code to reproduce

    import numpy as np
    import pandas as pd
    
    new_dtypes = {"uid": np.int64, "packagename": np.int64, "label_play": np.float64}
    
    train_df = pd.DataFrame(np.random.randint(0, 100, (5, 3)), columns=['uid', 'packagename', 'label_play'])
    train_df = train_df.astype(new_dtypes)
    train_df.to_parquet('train.parquet')
    
    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    from hybridbackend.tensorflow.data import ParquetDataset
    from tensorflow.python.data.ops import dataset_ops
    from tensorflow.python.data.experimental.ops.dataframe import to_sparse
    
    
    
    def parquet_map(record):
        for key in record:
            record[key] = tf.reshape(record[key], [-1])
        label = record.pop("label_play")
        return record, label
    
    
    # Create model
    def neural_net(features):
        with tf.device("/CPU:0"):
            var = tf.get_embedding_variable(
                "var_0",
                embedding_dim=3,
                initializer=tf.ones_initializer(tf.float32),
                partitioner=tf.fixed_size_partitioner(num_shards=4),
            )
    
        emb = tf.nn.embedding_lookup(var, features["uid"])
        fun = tf.multiply(emb, 2.0, name="multiply")
        loss = tf.reduce_sum(fun, name="reduce_sum")
        opt = tf.train.AdagradOptimizer(0.1)
    
        g_v = opt.compute_gradients(loss)
        train_op = opt.apply_gradients(g_v)
        return train_op, loss
    
    
    with hb.scope():
        with tf.device("/cpu:0"):
            dataset = tf.data.Dataset.list_files(["train.parquet"])
            dataset = dataset.apply(
                tf.data.experimental.parallel_interleave(
                    lambda tmp_file: ParquetDataset(
                        tmp_file,
                        drop_remainder=True,
                        batch_size=2,
                        num_parallel_reads=1,
                        fields=[
                            hb.data.DataFrame.Field("uid", tf.int64, ragged_rank=0),
                            hb.data.DataFrame.Field("packagename", tf.int64, ragged_rank=0),
                            hb.data.DataFrame.Field("label_play", tf.float64, ragged_rank=0),
                        ],
                    ).apply(
                        to_sparse()
                    ),
                    cycle_length=1,
                    block_length=1,
                )
            )
            dataset = dataset.batch(2, drop_remainder=True,).map(
                map_func=parquet_map,
                num_parallel_calls=dataset_ops.AUTOTUNE,
            )
        
        iterator = dataset.make_one_shot_iterator()
        # iterator = tf.data.make_one_shot_iterator(dataset)
        features, labels = iterator.get_next()
    
        train_op, loss = neural_net(features)
    
        scaffold = tf.train.Scaffold(
            init_op=tf.group(
                tf.global_variables_initializer(),
            ),
        )
    
        with tf.train.MonitoredTrainingSession(
            master="", scaffold=scaffold) as mon_sess:
            while not mon_sess.should_stop():
                _, ev = mon_sess.run([train_op, loss])
                print(ev)
    
    

    Willing to contribute

    Yes

    opened by fuhailin 0
  • error: Variables not initialized: communicator/1/HbNcclCommHandleOp

    error: Variables not initialized: communicator/1/HbNcclCommHandleOp

    Current behavior

    2022-10-19 12:39:39.948019: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
    2022-10-19 12:39:39.948020: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
    INFO:tensorflow:Parsing ../data//train.csv
    INFO:tensorflow:Parsing ../data//train.csv
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    INFO:tensorflow:Aggregate 12 dense gradients (33.35MB) and 0 sparse gradients (0.00MB), skip 26 aggregated gradients
    INFO:tensorflow:Aggregate 12 dense gradients (33.35MB) and 0 sparse gradients (0.00MB), skip 26 aggregated gradients
    2022-10-19 12:39:43.135528: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3792945000 Hz
    2022-10-19 12:39:43.136209: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x53d08e0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
    2022-10-19 12:39:43.136227: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
    2022-10-19 12:39:43.137613: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
    2022-10-19 12:39:43.147796: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3792945000 Hz
    2022-10-19 12:39:43.148600: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x4190950 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
    2022-10-19 12:39:43.148638: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
    2022-10-19 12:39:43.150144: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
    2022-10-19 12:39:43.263791: I tensorflow/stream_executor/cuda/cuda_driver.cc:404] Cuda add device primary context 0x6a91880
    2022-10-19 12:39:43.263977: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.264217: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x6a644c0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
    2022-10-19 12:39:43.264241: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
    2022-10-19 12:39:43.264400: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.264809: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1687] Found device 0 with properties: 
    name: NVIDIA GeForce RTX 2080 Ti major: 7 minor: 5 memoryClockRate(GHz): 1.635
    pciBusID: 0000:08:00.0
    2022-10-19 12:39:43.264837: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
    2022-10-19 12:39:43.267560: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
    2022-10-19 12:39:43.267587: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
    2022-10-19 12:39:43.272210: I tensorflow/stream_executor/cuda/cuda_driver.cc:404] Cuda add device primary context 0x51f3c70
    2022-10-19 12:39:43.272373: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.272643: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x51bf3f0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
    2022-10-19 12:39:43.272666: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
    2022-10-19 12:39:43.272784: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.272986: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1687] Found device 0 with properties: 
    name: NVIDIA GeForce RTX 2080 Ti major: 7 minor: 5 memoryClockRate(GHz): 1.635
    pciBusID: 0000:07:00.0
    2022-10-19 12:39:43.273007: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
    2022-10-19 12:39:43.275711: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
    2022-10-19 12:39:43.275737: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
    2022-10-19 12:39:43.288994: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
    2022-10-19 12:39:43.289162: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
    2022-10-19 12:39:43.289498: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.11
    2022-10-19 12:39:43.290069: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
    2022-10-19 12:39:43.290150: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
    2022-10-19 12:39:43.290231: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.290471: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.290643: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1815] Adding visible gpu devices: 0
    2022-10-19 12:39:43.292542: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1170] Device interconnect StreamExecutor with strength 1 edge matrix:
    2022-10-19 12:39:43.292558: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1176]      0 
    2022-10-19 12:39:43.292564: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1189] 0:   N 
    2022-10-19 12:39:43.292640: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.292843: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.293058: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1372] Created TensorFlow device (/job:worker/replica:0/task:0/device:GPU:0 with 9793 MB memory) -> physical GPU (device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:08:00.0, compute capability: 7.5)
    2022-10-19 12:39:43.294188: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> 127.0.0.1:20001}
    2022-10-19 12:39:43.294199: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> localhost:20002}
    2022-10-19 12:39:43.294986: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:374] Started server with target: grpc://localhost:20002
    2022-10-19 12:39:43.297217: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
    2022-10-19 12:39:43.297453: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
    2022-10-19 12:39:43.297814: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.11
    2022-10-19 12:39:43.298428: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
    2022-10-19 12:39:43.298520: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
    2022-10-19 12:39:43.298607: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.298845: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.299025: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1815] Adding visible gpu devices: 0
    2022-10-19 12:39:43.301007: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1170] Device interconnect StreamExecutor with strength 1 edge matrix:
    2022-10-19 12:39:43.301024: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1176]      0 
    2022-10-19 12:39:43.301031: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1189] 0:   N 
    2022-10-19 12:39:43.301114: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.301327: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.301552: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1372] Created TensorFlow device (/job:chief/replica:0/task:0/device:GPU:0 with 9729 MB memory) -> physical GPU (device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:07:00.0, compute capability: 7.5)
    2022-10-19 12:39:43.302687: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> localhost:20001}
    2022-10-19 12:39:43.302705: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> 127.0.0.1:20002}
    2022-10-19 12:39:43.303434: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:374] Started server with target: grpc://localhost:20001
    INFO:tensorflow:Graph was finalized.
    INFO:tensorflow:run without loading checkpoint
    INFO:tensorflow:Graph was finalized.
    INFO:tensorflow:run without loading checkpoint
    INFO:tensorflow:Running local_init_op.
    INFO:tensorflow:Running local_init_op.
    INFO:tensorflow:Done running local_init_op.
    INFO:tensorflow:Done running local_init_op.
    Using TensorFlow version 1.15.5
    Checking dataset...
    Numbers of training dataset is 8000000
    The training steps is 100
    Traceback (most recent call last):
      File "benchmark_hb.py", line 405, in <module>
        main()
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/function.py", line 144, in wrapped_fn
        return fn(*args, **kwargs)
      File "benchmark_hb.py", line 339, in main
        config=sess_config) as sess:
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 174, in HybridBackendMonitoredTrainingSession
        sess = fn(*args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 633, in MonitoredTrainingSession
        stop_grace_period_secs=stop_grace_period_secs)
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 69, in __init__
        session_creator, hooks, should_recover=True, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 775, in __init__
        self._sess = _RecoverableSession(self._coordinated_creator)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1257, in __init__
        _WrappedSession.__init__(self, self._create_session())
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1262, in _create_session
        return self._sess_creator.create_session()
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 928, in create_session
        self.tf_sess = self._session_creator.create_session()
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 697, in create_session
        init_fn=self._scaffold.init_fn)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/session_manager.py", line 323, in prepare_session
        (_maybe_name(init_op), init_fn, self._local_init_op, msg))
    RuntimeError: Init operations did not make model ready.  Init op: group_deps_2, init fn: None, local_init_op: name: "group_deps_1"
    op: "NoOp"
    input: "^group_deps_1/NoOp"
    input: "^group_deps_1/NoOp_1"
    device: "/job:chief/task:0/device:GPU:0"
    , error: Variables not initialized: communicator/0/HbNcclCommHandleOp
    Using TensorFlow version 1.15.5
    Checking dataset...
    Numbers of training dataset is 8000000
    The training steps is 100
    Traceback (most recent call last):
      File "benchmark_hb.py", line 405, in <module>
        main()
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/function.py", line 144, in wrapped_fn
        return fn(*args, **kwargs)
      File "benchmark_hb.py", line 339, in main
        config=sess_config) as sess:
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 174, in HybridBackendMonitoredTrainingSession
        sess = fn(*args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 633, in MonitoredTrainingSession
        stop_grace_period_secs=stop_grace_period_secs)
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 69, in __init__
        session_creator, hooks, should_recover=True, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 775, in __init__
        self._sess = _RecoverableSession(self._coordinated_creator)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1257, in __init__
        _WrappedSession.__init__(self, self._create_session())
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1262, in _create_session
        return self._sess_creator.create_session()
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 928, in create_session
        self.tf_sess = self._session_creator.create_session()
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 697, in create_session
        init_fn=self._scaffold.init_fn)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/session_manager.py", line 323, in prepare_session
        (_maybe_name(init_op), init_fn, self._local_init_op, msg))
    RuntimeError: Init operations did not make model ready.  Init op: group_deps_2, init fn: None, local_init_op: name: "group_deps_1"
    op: "NoOp"
    input: "^group_deps_1/NoOp"
    input: "^group_deps_1/NoOp_1"
    device: "/job:worker/task:0/device:GPU:0"
    , error: Variables not initialized: communicator/1/HbNcclCommHandleOp
    

    Expected behavior

    code run well

    System information

    • GPU model and memory: 2080Ti
    • OS Platform:
    • Docker version:
    • GCC/CUDA/cuDNN version: cuda 11.4
    • Python/conda version:
    • TensorFlow/PyTorch version: DeepRec, commit message: 6bca2cc4e6acaca3766e0425b53bdd

    Code to reproduce

    1. Download the train dataset(in csv format) from https://storage.googleapis.com/dataset-uploader/criteo-kaggle/large_version/train.csv
    2. The training script
    # Copyright (c) 2022 Intel Corporation
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # ==============================================================================
    
    from tensorflow.python.framework import dtypes
    import numpy as np
    from ast import arg
    import time
    import argparse
    import tensorflow as tf
    import os
    import sys
    import math
    import collections
    from tensorflow.python.client import timeline
    import json
    
    from tensorflow.python.framework import sparse_tensor
    from tensorflow.python.feature_column import feature_column_v2 as fc
    from tensorflow.python.ops import partitioned_variables
    from tensorflow.python.framework import ops
    os.environ["TF_GPU_THREAD_MODE"] = "global"
    import hybridbackend.tensorflow as hb
    
    # Set to INFO for tracking training, default is WARN. ERROR for least messages
    tf.logging.set_verbosity(tf.logging.INFO)
    print("Using TensorFlow version %s" % (tf.__version__))
    
    # Definition of some constants
    CONTINUOUS_COLUMNS = ['I' + str(i) for i in range(1, 14)]  # 1-13 inclusive
    CATEGORICAL_COLUMNS = ['C' + str(i) for i in range(1, 27)]  # 1-26 inclusive
    LABEL_COLUMN = ['clicked']
    TRAIN_DATA_COLUMNS = LABEL_COLUMN + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
    FEATURE_COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
    HASH_BUCKET_SIZES = {
        'C1': 2500,
        'C2': 2000,
        'C3': 300000,
        'C4': 250000,
        'C5': 1000,
        'C6': 100,
        'C7': 20000,
        'C8': 4000,
        'C9': 20,
        'C10': 100000,
        'C11': 10000,
        'C12': 250000,
        'C13': 40000,
        'C14': 100,
        'C15': 100,
        'C16': 200000,
        'C17': 50,
        'C18': 10000,
        'C19': 4000,
        'C20': 20,
        'C21': 250000,
        'C22': 100,
        'C23': 100,
        'C24': 250000,
        'C25': 400,
        'C26': 100000
    }
    
    EMBEDDING_DIMENSIONS = {
        'C1': 64,
        'C2': 64,
        'C3': 128,
        'C4': 128,
        'C5': 64,
        'C6': 64,
        'C7': 64,
        'C8': 64,
        'C9': 64,
        'C10': 128,
        'C11': 64,
        'C12': 128,
        'C13': 64,
        'C14': 64,
        'C15': 64,
        'C16': 128,
        'C17': 64,
        'C18': 64,
        'C19': 64,
        'C20': 64,
        'C21': 128,
        'C22': 64,
        'C23': 64,
        'C24': 128,
        'C25': 64,
        'C26': 128
    }
    
    
    def transform_numeric(feature):
        r'''Transform numeric features.
        '''
        # Notes: Statistics of Kaggle's Criteo Dataset has been calculated in advance to save time.
        mins_list = [
            0.0, -3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
        ]
        range_list = [
            1539.0, 22069.0, 65535.0, 561.0, 2655388.0, 233523.0, 26297.0, 5106.0,
            24376.0, 9.0, 181.0, 1807.0, 6879.0
        ]
    
        def make_minmaxscaler(min, range):
            def minmaxscaler(col):
                return (col - min) / range
    
            return minmaxscaler
    
        numeric_list = []
    
        for column_name in CONTINUOUS_COLUMNS:
            normalizer_fn = None
            i = CONTINUOUS_COLUMNS.index(column_name)
            normalizer_fn = make_minmaxscaler(mins_list[i], range_list[i])
            numeric = normalizer_fn(feature[column_name])
            numeric_list.append(tf.reshape(numeric, shape=[-1, 1]))
        return numeric_list
    
    
    def transform_categorical(feature):
        r'''Transform categorical features.
        '''
        deep_features = []
        max_value = np.iinfo(dtypes.int64.as_numpy_dtype).max
    
        variables = []
        indices = []
        for column_name in CATEGORICAL_COLUMNS:
            ev_opt = tf.EmbeddingVariableOption(
                evict_option=None, filter_option=None)
            device_str = '/gpu'
            with tf.device(device_str), hb.scope(sharding=True):
                embedding_weights = tf.get_embedding_variable(
                    f'{column_name}_weight',
                    initializer=tf.random_normal_initializer(
                        mean=0.0, stddev=0.05
                    ),
                    embedding_dim=EMBEDDING_DIMENSIONS[column_name],
                    ev_option=ev_opt
                )
    
            category = tf.strings.to_hash_bucket_fast(
                feature[column_name], max_value)
            sparse_tensor = fc._to_sparse_input_and_drop_ignore_values(category)
            sparse_tensor = tf.sparse.reshape(sparse_tensor, (-1, 1))
            
            deep_features.append(tf.nn.embedding_lookup_sparse(
                embedding_weights, sparse_tensor, None))
            
            variables.append(embedding_weights)
            indices.append(sparse_tensor)
        return deep_features
    
    
    def stacked_dcn_v2(features, mlp_dims):
        r'''Stacked DCNv2.
    
        DCNv2: Improved Deep & Cross Network and Practical Lessons for Web-scale
        Learning to Rank Systems.
    
        See https://arxiv.org/abs/2008.13535 for more information.
        '''
        with tf.name_scope('cross'):
            cross_input = tf.concat(features, axis=-1)
            cross_input_shape = [-1, sum([f.shape[-1] for f in features])]
            cross_input = tf.reshape(cross_input, cross_input_shape)
            cross_input_sq = tf.layers.dense(
                cross_input, cross_input.shape[-1],
                activation=tf.nn.relu,
                kernel_initializer=tf.truncated_normal_initializer(),
                bias_initializer=tf.zeros_initializer())
            cross_output = cross_input * cross_input_sq + cross_input
            cross_output = tf.reshape(cross_output, [-1, cross_input.shape[1]])
            cross_output_dim = (len(features) * (len(features) + 1)) / 2
    
        with tf.name_scope('mlp'):
            prev_layer = cross_output
            prev_dim = cross_output_dim
            for i, d in enumerate(mlp_dims[:-1]):
                prev_layer = tf.layers.dense(
                    prev_layer, d,
                    activation=tf.nn.relu,
                    kernel_initializer=tf.random_normal_initializer(
                        mean=0.0,
                        stddev=math.sqrt(2.0 / (prev_dim + d))),
                    bias_initializer=tf.random_normal_initializer(
                        mean=0.0,
                        stddev=math.sqrt(1.0 / d)),
                    name=f'mlp_{i}')
                prev_dim = d
            return tf.layers.dense(
                prev_layer, mlp_dims[-1],
                activation=tf.nn.sigmoid,
                kernel_initializer=tf.random_normal_initializer(
                    mean=0.0,
                    stddev=math.sqrt(2.0 / (prev_dim + mlp_dims[-1]))),
                bias_initializer=tf.random_normal_initializer(
                    mean=0.0,
                    stddev=math.sqrt(1.0 / mlp_dims[-1])),
                name=f'mlp_{len(mlp_dims) - 1}')
    
    
    # generate dataset pipline
    def build_model_input(filename, batch_size, num_epochs):
        def parse_csv(value):
            tf.logging.info('Parsing {}'.format(filename))
            cont_defaults = [[0.0] for i in range(1, 14)]
            cate_defaults = [[' '] for i in range(1, 27)]
            label_defaults = [[0]]
            column_headers = TRAIN_DATA_COLUMNS
            record_defaults = label_defaults + cont_defaults + cate_defaults
            columns = tf.io.decode_csv(value, record_defaults=record_defaults)
            all_columns = collections.OrderedDict(zip(column_headers, columns))
            labels = all_columns.pop(LABEL_COLUMN[0])
            features = all_columns
            return features, labels
    
        '''Work Queue Feature'''
        if args.workqueue:
            from tensorflow.python.ops.work_queue import WorkQueue
            work_queue = WorkQueue([filename])
            # For multiple files:
            # work_queue = WorkQueue([filename, filename1,filename2,filename3])
            files = work_queue.input_dataset()
        else:
            files = filename
        # Extract lines from input files using the Dataset API.
        dataset = tf.data.TextLineDataset(files)
        dataset = dataset.shuffle(buffer_size=20000,
                                  seed=args.seed)  # fix seed for reproducing
        dataset = dataset.repeat(num_epochs)
        dataset = dataset.batch(batch_size)
        dataset = dataset.map(parse_csv, num_parallel_calls=28)
        dataset = dataset.prefetch(2)
        return dataset
    
    @hb.function()
    def main():
        
        # check dataset and count data set size
        print("Checking dataset...")
        train_file = args.data_location + '/train.csv'
        if (not os.path.exists(train_file)):
            print("Dataset does not exist in the given data_location.")
            sys.exit()
        no_of_training_examples = sum(1 for line in open(train_file))
        print("Numbers of training dataset is {}".format(no_of_training_examples))
    
        # set batch size, eporch & steps
        batch_size = args.batch_size
    
        if args.steps == 0:
            no_of_epochs = 1
            train_steps = math.ceil(
                (float(no_of_epochs) * no_of_training_examples) / batch_size)
        else:
            no_of_epochs = math.ceil(
                (float(batch_size) * args.steps) / no_of_training_examples)
            train_steps = args.steps
        print("The training steps is {}".format(train_steps))
    
        # set fixed random seed
        tf.set_random_seed(args.seed)
    
        # create data pipline of train & test dataset
        with tf.device('/cpu:0'):
            train_dataset = build_model_input(train_file, batch_size, no_of_epochs)
    
            iterator = tf.data.Iterator.from_structure(train_dataset.output_types,
                                                    train_dataset.output_shapes)
            next_element = iterator.get_next()
    
        train_init_op = iterator.make_initializer(train_dataset)
    
        # create feature column
        feature, labels = next_element[0], next_element[1]
    
        deep_features = transform_categorical(feature)
        wide_features = transform_numeric(feature)
        logits = stacked_dcn_v2(features=deep_features + wide_features,
                                mlp_dims=[1024, 1024, 512, 256, 1]
                                )
        loss = tf.reduce_mean(tf.keras.losses.binary_crossentropy(tf.reshape(labels, (-1, 1)), logits))
    
        step = tf.train.get_or_create_global_step()
        opt = tf.train.AdagradOptimizer(learning_rate=0.01)
        train_op = opt.minimize(loss, global_step=step)
    
        # Session config
        sess_config = tf.ConfigProto()
    
        # # Session hooks
        hooks = []
    
        # if args.smartstaged and not args.tf:
        #     '''Smart staged Feature'''
        #     next_element = tf.staged(next_element, num_threads=4, capacity=40)
        #     sess_config.graph_options.optimizer_options.do_smart_stage = True
        #     hooks.append(tf.make_prefetch_hook())
        # if args.op_fusion and not args.tf:
        #     '''Auto Graph Fusion'''
        #     sess_config.graph_options.optimizer_options.do_op_fusion = True
        # if args.micro_batch and not args.tf:
        #     '''Auto Mirco Batch'''
        #     sess_config.graph_options.optimizer_options.micro_batch_num = args.micro_batch
    
        scaffold = tf.train.Scaffold(
            local_init_op=tf.group(
                tf.local_variables_initializer(), train_init_op),
        )
    
        stop_hook = tf.train.StopAtStepHook(last_step=train_steps)
        log_hook = tf.train.LoggingTensorHook(
            {
                'steps': step,
                'loss': loss,
            }, every_n_iter=1)
        hooks.append(stop_hook)
        hooks.append(log_hook)
    
        with tf.train.MonitoredTrainingSession(
                master='',
                hooks=hooks,
                scaffold=scaffold,
                config=sess_config) as sess:
            while not sess.should_stop():
                print(sess.run([feature]))
                sess.run([loss, train_op])
        print("Training completed.")
    
    
    def boolean_string(string):
        low_string = string.lower()
        if low_string not in {'false', 'true'}:
            raise ValueError('Not a valid boolean string')
        return low_string == 'true'
    
    
    # Get parse
    def get_arg_parser():
        parser = argparse.ArgumentParser()
        parser.add_argument('--data_location',
                            help='Full path of train data',
                            required=False,
                            default='./data')
        parser.add_argument('--steps',
                            help='set the number of steps on train dataset',
                            type=int,
                            default=0)
        parser.add_argument('--batch_size',
                            help='Batch size to train. Default is 512',
                            type=int,
                            default=512)
        parser.add_argument('--seed',
                            help='set the random seed for tensorflow',
                            type=int,
                            default=2021)
        parser.add_argument('--workqueue',
                            help='Whether to enable Work Queue. Default to False.',
                            type=boolean_string,
                            default=False)
        return parser
    
    
    # Some DeepRec's features are enabled by ENV.
    # This func is used to set ENV and enable these features.
    # A triple quotes comment is used to introduce these features and play an emphasizing role.
    def set_env_for_DeepRec():
        '''
        Set some ENV for these DeepRec's features enabled by ENV. 
        More Detail information is shown in https://deeprec.readthedocs.io/zh/latest/index.html.
        START_STATISTIC_STEP & STOP_STATISTIC_STEP: On CPU platform, DeepRec supports memory optimization
            in both stand-alone and distributed trainging. It's default to open, and the 
            default start and stop steps of collection is 1000 and 1100. Reduce the initial 
            cold start time by the following settings.
        MALLOC_CONF: On CPU platform, DeepRec can use memory optimization with the jemalloc library.
            Please preload libjemalloc.so by `LD_PRELOAD=./libjemalloc.so.2 python ...`
        '''
        os.environ['START_STATISTIC_STEP'] = '100'
        os.environ['STOP_STATISTIC_STEP'] = '110'
        os.environ['MALLOC_CONF'] = \
            'background_thread:true,metadata_thp:auto,dirty_decay_ms:20000,muzzy_decay_ms:20000'
    
    
    if __name__ == '__main__':
        parser = get_arg_parser()
        args = parser.parse_args()
    
        set_env_for_DeepRec()
    
        main()
    
    1. Training command:
    python -m hybridbackend.run  python benchmark_hb.py --data_location ../data/ --steps 100 
    

    Willing to contribute

    Yes

    opened by shijieliu 0
  • Error when drop_reminder=True using rebatch API

    Error when drop_reminder=True using rebatch API

    Current behavior

    Using rebatch API with drop_reminder=True will make program exit with segmentation fault

    Expected behavior

    No error

    System information

    • GPU model and memory:
    • OS Platform: ubuntu 18
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version: python 3.6
    • TensorFlow/PyTorch version: 1.5.0
    • HybridBackend version: 0.6.0a0

    Code to reproduce

    (1) First generate a random parquet file.

    import pandas as pd
    import random
    
    data_list = []
    for i in range(1, 100000):
        int_feature = random.randint(1, 1000)
        array_feature = [random.randint(1, 1000) for x in range(0, 50)]
        data_list.append([int_feature, array_feature, 0.8])
    
    df = pd.DataFrame(data_list, columns=["int_feature", "array_feature", "label"])
    df['label'] = pd.to_numeric(df["label"], downcast="float")
    df.to_parquet("parquet_sample_file.parquet")
    

    (2) Then read data

    import tensorflow as tf
    import tensorflow.keras as keras
    import hybridbackend.tensorflow as hb
    
    BATCH_SIZE = 1000
    
    
    def get_parquet_ds():
        filenames_ds = tf.data.Dataset.from_tensor_slices([
            'parquet_sample_file.parquet'
        ]*1)
        hb_fields = []
    
        def _map(elem):
            features = {
                "int_feature": tf.cast(tf.reshape(elem["int_feature"], [-1, 1]), dtype=tf.float32),
                "array_feature": tf.cast(tf.reshape(elem["array_feature"].values, [-1, 50]),
                                                  dtype=tf.float32)
            }
            labels = tf.reshape(elem["label"], [-1, 1])
            return features, labels
    
        hb_fields.append(hb.data.DataFrame.Field("int_feature", tf.int64, ragged_rank=0))
        hb_fields.append(hb.data.DataFrame.Field("array_feature", tf.int64, ragged_rank=1))
        hb_fields.append(hb.data.DataFrame.Field("label", tf.float32, ragged_rank=0))
        iterator = filenames_ds.apply(
            hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
        iterator = iterator.apply(hb.data.rebatch(BATCH_SIZE*2, fields=hb_fields, drop_remainder=True)).map(_map)
    
        return iterator
    
    
    def train():
        global_init_op = tf.compat.v1.global_variables_initializer()
    
        ds = get_parquet_ds()
        iterator = ds.make_one_shot_iterator()
        get_data_op = iterator.get_next()
    
        with tf.compat.v1.Session() as sess:
            a = sess.run([global_init_op])
            i = 1
            while True:
                try:
                    sample = sess.run([get_data_op])
    
                    f_category = sample[0][0]["int_feature"]
                    f_list = sample[0][0]["array_feature"]
                    labels_ = sample[0][1]
    
                    if i % 100 == 0:
                        print(f"step={i}")
                    i += 1
    
                except tf.errors.OutOfRangeError:
                    break
    
    
    if __name__ == '__main__':
        train()
    

    Willing to contribute

    Yes

    opened by liurcme 0
Releases(v0.7.0)
  • v0.7.0(Oct 21, 2022)

    Objectives:

    1. Memory-efficient loading of categorical data
    2. GPU-efficient orchestration of embedding layers
    3. Communication-efficient training and evaluation at scale
    4. Easy to use with existing AI workflows

    Features:

    1. Performance
    • Support of automatic embedding fusion on PAI DLC / PAI DSW
    • Support of data transfer prefetching
    1. Usability
    • Support of embedding_lookup_* API
    • Support of Keras Model API
    • Support direct pip install via Pypi
    Source code(tar.gz)
    Source code(zip)
  • v0.5.4(Jul 25, 2022)

    Objectives:

    • Easy to use with existing AI workflows

    Features:

    • Support fixed length list in ParquetDataset
    • Support schema parsing in ParquetDataset
    • Provide validation tools for parquet files

    Bug Fixes:

    • Fixes indices calculation in rebatching
    Source code(tar.gz)
    Source code(zip)
  • v0.6.0(Apr 16, 2022)

    Objectives:

    1. Communication-efficient training and evaluation at scale
    2. Easy to use with existing AI workflows

    Features:

    1. Data-Parallel Training and Evaluation
    • Bucketized Gradients Aggregation using AllReduce
    • Global Metric Operations
    • Out-Of-Range Coordination
    1. Hybrid-Parallel Embedding Learning
    • Bucketized Embedding Exchanging using AllToAllv
    • Fusion and Quantization of AllToAllv
    • Fusion of Partitioning and Stitching
    1. Usability
    • Support of MonitoredSession and Estimator
    • Declarative API for Model Definition
    1. Compatibility
    • Support of NVIDIA TensorFlow and DeepRec
    1. Interoperability
    • Inference Pipeline Needs No Change
    • Support of SavedModel
    • Support of Variable, XDL HashTable and PAI Embedding Variable

    Bug Fixes:

    [#46] Fixes rebatching in ParquetDataset.

    Source code(tar.gz)
    Source code(zip)
  • v0.5.3(Jul 25, 2022)

  • v0.5.2(Dec 2, 2021)

    Objectives:

    • Memory-efficient loading of categorical data
    • Easy to use with existing AI workflows

    Features:

    1. Parquet Dataset
    • Reading batch of tensors from numeric fields in zero-copy way
    • Reading batch of sparse tensors from numeric list fields in zero-copy way
    • Support of string fields
    • Support of local filesystem, HDFS, S3 and OSS
    1. Data Pipeline Functions
    • Resizing batch of tensors and ragged tensors
    • Converting ragged tensors to sparse tensors
    • Objective: "Easy to use with existing AI workflows"
    1. Compatibility
    • Support of TensorFlow 1.15 and Tensorflow 1.14
    • GitHub actions for uploading wheels to PyPI

    Bug Fixes:

    • [#11][#12][#13] Supports manylinux_2_24 platform.
    Source code(tar.gz)
    Source code(zip)
Owner
Alibaba
Alibaba Open Source
Alibaba
End-to-End Referring Video Object Segmentation with Multimodal Transformers

End-to-End Referring Video Object Segmentation with Multimodal Transformers This repo contains the official implementation of the paper: End-to-End Re

608 Dec 30, 2022
EDCNN: Edge enhancement-based Densely Connected Network with Compound Loss for Low-Dose CT Denoising

EDCNN: Edge enhancement-based Densely Connected Network with Compound Loss for Low-Dose CT Denoising By Tengfei Liang, Yi Jin, Yidong Li, Tao Wang. Th

workingcoder 115 Jan 05, 2023
An implementation of the 1. Parallel, 2. Streaming, 3. Randomized SVD using MPI4Py

PYPARSVD This implementation allows for a singular value decomposition which is: Distributed using MPI4Py Streaming - data can be shown in batches to

Romit Maulik 44 Dec 31, 2022
This is the code for "HyperNeRF: A Higher-Dimensional Representation for Topologically Varying Neural Radiance Fields".

HyperNeRF: A Higher-Dimensional Representation for Topologically Varying Neural Radiance Fields This is the code for "HyperNeRF: A Higher-Dimensional

Google 702 Jan 02, 2023
Using OpenAI's CLIP to upscale and enhance images

CLIP Upscaler and Enhancer Using OpenAI's CLIP to upscale and enhance images Based on nshepperd's JAX CLIP Guided Diffusion v2.4 Sample Results Viewpo

Tripp Lyons 5 Jun 14, 2022
Cours d'Algorithmique Appliquée avec Python pour BTS SIO SISR

Course: Introduction to Applied Algorithms with Python (in French) This is the source code of the website for the Applied Algorithms with Python cours

Loic Yvonnet 0 Jan 27, 2022
Code for "Long Range Probabilistic Forecasting in Time-Series using High Order Statistics"

Long Range Probabilistic Forecasting in Time-Series using High Order Statistics This is the code produced as part of the paper Long Range Probabilisti

16 Dec 06, 2022
Official code for Spoken ObjectNet: A Bias-Controlled Spoken Caption Dataset

Official code for our Interspeech 2021 - Spoken ObjectNet: A Bias-Controlled Spoken Caption Dataset [1]*. Visually-grounded spoken language datasets c

Ian Palmer 3 Jan 26, 2022
5 Jan 05, 2023
Semantic Scholar's Author Disambiguation Algorithm & Evaluation Suite

S2AND This repository provides access to the S2AND dataset and S2AND reference model described in the paper S2AND: A Benchmark and Evaluation System f

AI2 54 Nov 28, 2022
Fuwa-http - The http client implementation for the fuwa eco-system

Fuwa HTTP The HTTP client implementation for the fuwa eco-system Example import

Fuwa 2 Feb 16, 2022
A GUI to automatically create a TOPAS-readable MLC simulation file

Python script to create a TOPAS-readable simulation file descriring a Multi-Leaf-Collimator. Builds the MLC using the data from a 3D .stl file.

Sebastian Schäfer 0 Jun 19, 2022
Adaptive Prototype Learning and Allocation for Few-Shot Segmentation (CVPR 2021)

ASGNet The code is for the paper "Adaptive Prototype Learning and Allocation for Few-Shot Segmentation" (accepted to CVPR 2021) [arxiv] Overview data/

Gen Li 91 Dec 23, 2022
Nest Protect integration for Home Assistant. This will allow you to integrate your smoke, heat, co and occupancy status real-time in HA.

Nest Protect integration for Home Assistant Custom component for Home Assistant to interact with Nest Protect devices via an undocumented and unoffici

Mick Vleeshouwer 175 Dec 29, 2022
Rotation-Only Bundle Adjustment

ROBA: Rotation-Only Bundle Adjustment Paper, Video, Poster, Presentation, Supplementary Material In this repository, we provide the implementation of

Seong 51 Nov 29, 2022
This is a library for training and applying sparse fine-tunings with torch and transformers.

This is a library for training and applying sparse fine-tunings with torch and transformers. Please refer to our paper Composable Sparse Fine-Tuning f

Cambridge Language Technology Lab 37 Dec 30, 2022
Fastshap: A fast, approximate shap kernel

fastshap: A fast, approximate shap kernel fastshap was designed to be: Fast Calculating shap values can take an extremely long time. fastshap utilizes

Samuel Wilson 22 Sep 24, 2022
PyTorch implementation of DeepLab v2 on COCO-Stuff / PASCAL VOC

DeepLab with PyTorch This is an unofficial PyTorch implementation of DeepLab v2 [1] with a ResNet-101 backbone. COCO-Stuff dataset [2] and PASCAL VOC

Kazuto Nakashima 995 Jan 08, 2023
UniFormer - official implementation of UniFormer

UniFormer This repo is the official implementation of "Uniformer: Unified Transformer for Efficient Spatiotemporal Representation Learning". It curren

SenseTime X-Lab 573 Jan 04, 2023
LogAvgExp - Pytorch Implementation of LogAvgExp

LogAvgExp - Pytorch Implementation of LogAvgExp for Pytorch Install $ pip instal

Phil Wang 31 Oct 14, 2022