sql >> データベース >  >> NoSQL >> MongoDB

PySpark構造化ストリーミングを使用してKafkaストリームをMongoDBにシンクします

    解決策を見つけました。StructuredStreamingに適したMongoドライバーが見つからなかったため、別の解決策に取り組みました。次に、mongoDbへの直接接続を使用し、foreachbatch(の代わりに "foreach(...)"を使用します。 ..)。 testSpark.pyファイルの私のコードは次のようになります:

    ....
    import pymongo
    from pymongo import MongoClient
    
    local_url = "mongodb://localhost:27017"
    
    
    def write_machine_df_mongo(target_df):
    
        cluster = MongoClient(local_url)
        db = cluster["test_db"]
        collection = db.test1
    
        post = {
                "machine_id": target_df.machine_id,
                "proc_type": target_df.proc_type,
                "sensor1_id": target_df.sensor1_id,
                "sensor2_id": target_df.sensor2_id,
                "time": target_df.time,
                "sensor1_val": target_df.sensor1_val,
                "sensor2_val": target_df.sensor2_val,
                }
    
        collection.insert_one(post)
    
    machine_df.writeStream\
        .outputMode("append")\
        .foreach(write_machine_df_mongo)\
        .start()
    



    1. graphqlでマングースが必要ですか?

    2. mongodbグループとサブグループの数

    3. マングースの平均を計算する

    4. MongoDBで多くのドキュメントを挿入または更新します