SQLは表形式のデータ(または、そのように考えたい場合はリレーション)を処理して返しますが、すべてのSQLテーブルがリレーションであるとは限りません。これが意味するのは、質問に示されているようなネストされたテーブルはそれほど一般的な機能ではないということです。たとえば、JSONの配列やコンポジットを使用して、Postgresqlでこの種の何かを生成する方法はありますが、表形式のデータをフェッチしてアプリケーションでネストを実行することは完全に可能です。 Pythonにはitertools.groupby()
があります
、ソートされたデータを考えると、これは法案に非常によく適合します。
エラーcolumn "incoming.id" must appear in the GROUP BY clause...
選択リスト内の非集計、having句などは、GROUP BY
に表示される必要があると言っています。 句を使用するか、集計で使用します。不確定な値が存在する可能性があります 。つまり、 GROUP BY
であるため、値はグループ内のある行から選択する必要があります。 グループ化された行を1つの行に凝縮します 、そしてそれは彼らがどの行から選ばれたかは誰でも推測するでしょう。 SQLiteやMySQLが行っていたように、実装はこれを許可するかもしれませんが、SQL標準はそれを禁止しています。ルールの例外は、関数従属性
がある場合です。; GROUP BY
節は非集計を決定します。テーブル間の結合について考えてみてくださいA およびB Aでグループ化 の主キー。グループ内のどの行に関係なく、システムは Aの値を選択します の列は、主キーに基づいてグループ化が行われたため、同じになります。
3ポイントの一般的な意図されたアプローチに対処するために、1つの方法は、タイムスタンプ順に並べられた着信と発信の和集合を選択することです。 継承階層 がないため セットアップ-存在しないかもしれないので、私はアカウンティングに精通していません-コアとプレーンな結果タプルの使用に戻ると、この場合は簡単になります:
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)
次に、ネストされた構造を形成するためにitertools.groupby()
使用される:
date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]
最終結果は、日付の2タプルのリストと、エントリの辞書の昇順のリストです。 ORMソリューションとは言えませんが、仕事は終わります。例:
In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [57]: session.commit()
In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
...: where(Incoming.accountID == 1)
...:
...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
...: where(Outgoing.accountID == 1)
...:
...: all_entries = incoming.union(outgoing)
...: all_entries = all_entries.order_by(all_entries.c.timestamp)
...: all_entries = db_session.execute(all_entries)
In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]:
[(datetime.date(2019, 9, 1),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 5,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 2),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 3),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 2,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
'type': 'outgoing'}])]
前述のように、PostgresqlはJSONの配列を使用する場合とほぼ同じ結果を生成できます:
from sqlalchemy.dialects.postgresql import aggregate_order_by
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing).alias('all_entries')
day = func.date_trunc('day', all_entries.c.timestamp)
stmt = select([day,
func.array_agg(aggregate_order_by(
func.row_to_json(literal_column('all_entries.*')),
all_entries.c.timestamp))]).\
group_by(day).\
order_by(day)
db_session.execute(stmt).fetchall()
実際にIncoming
の場合 およびOutgoing
Entry
など、共通ベースの子と考えることができます。 、ユニオンの使用は、コンクリートテーブル継承を使用して、いくらか自動化できます。
:
from sqlalchemy.ext.declarative import AbstractConcreteBase
class Entry(AbstractConcreteBase, Base):
pass
class Incoming(Entry):
__tablename__ = 'incoming'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="incomings")
__mapper_args__ = {
'polymorphic_identity': 'incoming',
'concrete': True
}
class Outgoing(Entry):
__tablename__ = 'outgoing'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="outgoings")
__mapper_args__ = {
'polymorphic_identity': 'outgoing',
'concrete': True
}
残念ながら、AbstractConcreteBase
configure_mappers()
を手動で呼び出す必要があります
必要なすべてのクラスが定義されたとき。この場合、最も早い可能性はUser
を定義した後です。 、Account
のため 関係を通じてそれに依存します:
from sqlalchemy.orm import configure_mappers
configure_mappers()
次に、すべてのIncoming
をフェッチするために およびOutgoing
単一のポリモーフィックORMクエリでは、Entry
を使用します :
session.query(Entry).\
filter(Entry.accountID == accountID).\
order_by(Entry.timestamp).\
all()
itertools.groupby()
の使用に進みます 結果のIncoming
のリストに上記のように およびOutgoing
。