and some example from python http server side implementing deltalake db service:  


 
    @retry(Exception, tries=5, delay=1, backoff=2)
    def retrieve_ongoing_game_data(self, username,  game_version):
        dt = DeltaTable(self.basecreatedb.usersdbpathforgame, storage_options=GameDB.so)
        uid = str(mmh3.hash128(username))
       
        ds = dt.to_pyarrow_dataset()

         
        tables = []

        condition = (dspy.field("userid") == uid) & (
        dspy.field("cur_game_version") == game_version)  
        batch_iter = ds.to_batches(filter=condition, batch_size=1)
        for batch in batch_iter:
            tables.append(batch.to_pandas())
            break
       
        if len(tables) > 0:
            return tables[0].to_dict('records')
        else:
            return None


    #
    def _register_ongoing_game(self, uid, gid, ts, game_version, websocket_url, is_creator=False):

        dfg = pl.DataFrame(
        {
            'cur_game_version': game_version,
            'userid': str(uid),
            'accomplishments': None,
            'ongoing_gameid':  str(gid),
            'last_joined_gameat': int(ts),
            'ongoing_game_started': None,
            'ongoing_game_finished': None,
            'ongoing_game_is_creator': is_creator,
            'ongoing_game_websocket_url':  websocket_url,
            'lastonline': int(ts),
        },
            schema={
                "cur_game_version": pl.String,
                "userid": pl.String,
                "accomplishments": pl.Array(pl.String, 0),
                "ongoing_gameid": pl.String,
                "last_joined_gameat": pl.Datetime('ms', time_zone="UTC"),
                "ongoing_game_started": pl.Boolean,
                "ongoing_game_finished": pl.Boolean,
                "ongoing_game_is_creator": pl.Boolean,
                'ongoing_game_websocket_url': pl.String,
                'lastonline': pl.Datetime('ms', time_zone="UTC"),
        })
        #dfg.show()
        wop = (dfg.write_delta(
        self.basecreatedb.usersdbpathforgame,
            mode='merge',
            delta_merge_options={
                'predicate': 'source.userid = target.userid and source.cur_game_version = target.cur_game_version ',
                'source_alias': 'source',
                'target_alias': 'target',
            },
        storage_options=GameDB.so)
        .when_matched_update(updates={
            "ongoing_gameid": "source.ongoing_gameid",
            "last_joined_gameat": "source.last_joined_gameat",
            "ongoing_game_websocket_url": "source.ongoing_game_websocket_url",
            "ongoing_game_is_creator": str(is_creator).lower()
        })
        .execute() #.when_matched_update_all()
        )
        if (wop["num_target_rows_updated"] <1):
            return False
        else:
            return True

    @retry(Exception, tries=5, delay=1, backoff=2)
    def _register_ongoing_game_with_retry(self, uid, gid, ts, game_version, websocket_url, is_creator=False):
        return self._register_ongoing_game( uid, gid, ts, game_version, websocket_url, is_creator)
   
    def try_join_game(self, username, g):
        try:
            uid = str(mmh3.hash128(username))
            df = pl.DataFrame(
                {
                    "year": g["year"],
                "month": g["month"],
                "day": g["day"],
                "is_joinable": g["is_joinable"],
                "game_version":g["game_version"],
                "remaining_capacity": g["remaining_capacity"],
                "token": g["token"],
                "userid": g["userid"],
                "hour": g["hour"],
                "minute":g["minute"],
                "second":g["second"],
                "gameid": g["gameid"],
                "current_joined_gamers": None,
                "createdat": None,
                "startedat": None,
                "finishedat":None,
                "is_started": None,
                "is_finished": None,
                "websocket_url": None,
                "allocated_capacity": None,
                "capacity": None,
                "won_team": None,
               
            },
                    schema={
                        "year": pl.Int32,
                "month": pl.Int32,
                "day": pl.Int32,
                "is_joinable": pl.Boolean,
                "game_version": pl.String,
                "remaining_capacity": pl.Int32,
                "token": pl.String,
                "hour": pl.Int32,
                "minute":pl.Int32,
                "second": pl.Int32,
                "userid": pl.String,
                "gameid": pl.String,
                "current_joined_gamers": pl.Array(pl.String, 0),
                "createdat": pl.Datetime('ms', time_zone="UTC"),
                "startedat": pl.Datetime('ms', time_zone="UTC"),
                "finishedat":pl.Datetime('ms', time_zone="UTC"),
                "is_started": pl.Boolean,
                "is_finished": pl.Boolean,
                "websocket_url": pl.String,
                "allocated_capacity": pl.Int32,
                "capacity": pl.Int32,
                "won_team": pl.String }
               
            )
           
            condition  = " and ".join([ "source.{} = target.{}".format(i, i) for i in ["year", "month" , "day",
                                "is_joinable", "game_version", "remaining_capacity", "token",
                                "userid", "hour", "minute","second",
                                "gameid"]])
           
            wop = (df.write_delta(
                self.basecreatedb.gamedbpath,
                mode='merge',
                delta_merge_options={
                    'predicate': condition ,
                    'source_alias': 'source',
                    'target_alias': 'target',
                },
            storage_options=GameDB.so)
            #.when_matched_update_all()
            .when_matched_update(updates={
                "is_joinable": str((g["remaining_capacity"]-1)>0).lower(),
                "remaining_capacity": str(g["remaining_capacity"]-1),
                "current_joined_gamers": "array_append(target.current_joined_gamers, '{}')".format(uid),
                "allocated_capacity" : "target.allocated_capacity+1",
                                        })
            .execute() #.when_matched_update_all()
            )

            if (wop["num_target_rows_updated"] <1):
                return False
            ts = datetime.now(timezone.utc).timestamp() *1000
       
            self._register_ongoing_game_with_retry(str(uid),  g["gameid"], int(ts), g["game_version"],  g["websocket_url"])
       
            return True
        except:
            return False

Yorumlar

Bu blogdaki popüler yayınlar

disgusting terrsts of foreign gypsies foreign terrorst grp/cult